未验证 提交 c196eb87 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18101 from taosdata/fix/TD-20052

refact: adjust sync log prints and local cmds
......@@ -239,20 +239,29 @@ void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
// option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
// on message ---------------------
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
......@@ -271,7 +280,6 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
......@@ -310,8 +318,7 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
bool syncNodeCanChange(SSyncNode* pSyncNode);
bool syncNodeCanChange(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
......@@ -321,38 +328,6 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);
bool syncNodeIsMnode(SSyncNode* pSyncNode);
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
// trace log
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
// syncUtil.h
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...);
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
const char* format, ...);
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
const char* format, ...);
#ifdef __cplusplus
}
#endif
......
......@@ -224,8 +224,6 @@ typedef enum {
SYNC_LOCAL_CMD_FOLLOWER_CMT,
} ESyncLocalCmd;
const char* syncLocalCmdGetStr(int32_t cmd);
typedef struct SyncLocalCmd {
uint32_t bytes;
int32_t vgId;
......@@ -236,49 +234,8 @@ typedef struct SyncLocalCmd {
int32_t cmd;
SyncTerm sdNewTerm; // step down new term
SyncIndex fcIndex; // follower commit index
} SyncLocalCmd;
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId);
void syncLocalCmdDestroy(SyncLocalCmd* pMsg);
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen);
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg);
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len);
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len);
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg);
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg);
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg);
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg);
// for debug ----------------------
void syncLocalCmdPrint(const SyncLocalCmd* pMsg);
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg);
void syncLocalCmdLog(const SyncLocalCmd* pMsg);
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
// on message ----------------------
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
// -----------------------------------------
// option ----------------------------------
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seq, bool isWeak, int32_t vgId);
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
......@@ -294,6 +251,10 @@ int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId,
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId);
const char* syncTimerTypeStr(ESyncTimeoutType timerType);
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd);
#ifdef __cplusplus
}
......
......@@ -34,7 +34,7 @@ extern "C" {
// /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
// /\ UNCHANGED <<messages, leaderVars, logVars>>
//
int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
#ifdef __cplusplus
}
......
......@@ -48,6 +48,39 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
const char* format, ...);
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
#ifdef __cplusplus
}
#endif
......
......@@ -124,18 +124,6 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
return 0;
}
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen, s);
}
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont;
SRpcMsg rpcRsp = {0};
......
......@@ -15,12 +15,13 @@
#define _DEFAULT_SOURCE
#include "syncAppendEntriesReply.h"
#include "syncMessage.h"
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
......
......@@ -33,14 +33,6 @@
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
static void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
if (pNode->state != TAOS_SYNC_STATE_CANDIDATE) {
sNTrace(pNode, "not candidate, stop elect");
......
......@@ -138,7 +138,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
code = syncNodeOnTimer(pSyncNode, pMsg);
code = syncNodeOnTimeout(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
......@@ -154,9 +154,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
syncLocalCmdDestroy(pSyncMsg);
code = syncNodeOnLocalCmd(pSyncNode, pMsg);
} else {
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
code = -1;
......@@ -1036,9 +1034,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode);
}
// option
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
// timer control --------------
......@@ -2017,13 +2012,13 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeFollowerCommit(ths, pMsg->commitIndex);
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
pSyncMsg->fcIndex = pMsg->commitIndex;
SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
......@@ -2038,13 +2033,13 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeStepDown(ths, pMsg->term);
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
pSyncMsg->sdNewTerm = pMsg->term;
SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
......@@ -2054,8 +2049,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
}
}
syncLocalCmdDestroy(pSyncMsg);
}
/*
......@@ -2079,7 +2072,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
return 0;
}
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pMsg = pRpcMsg->pCont;
syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
......@@ -2486,161 +2480,3 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
return true;
}
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
if (timerType == SYNC_TIMEOUT_PING) {
return "ping";
} else if (timerType == SYNC_TIMEOUT_ELECTION) {
return "elect";
} else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
return "heartbeat";
} else {
return "unknown";
}
}
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
}
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
}
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
}
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
}
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
}
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
}
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
}
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->snapStart, s);
}
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->snapStart, s);
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
}
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d, len:%u}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq,
pMsg->dataLen, s);
}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
......@@ -15,10 +15,7 @@
#define _DEFAULT_SOURCE
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftEntry.h"
#include "syncUtil.h"
#include "tcoding.h"
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
SSyncNode* pNode) {
......@@ -189,6 +186,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
return 0;
}
#if 0
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncPreSnapshot);
pMsg->pCont = rpcMallocCont(bytes);
......@@ -222,6 +220,7 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
pPreSnapshotReply->vgId = vgId;
return 0;
}
#endif
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
......@@ -275,164 +274,43 @@ int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
return 0;
}
const char* syncLocalCmdGetStr(int32_t cmd) {
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
return "step-down";
} else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
return "follower-commit";
}
return "unknown-local-cmd";
}
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncLocalCmd);
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncLocalCmd);
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
return pMsg;
}
void syncLocalCmdDestroy(SyncLocalCmd* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncLocalCmdSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncLocalCmdDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) {
syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
SyncLocalCmd* pLocalCmd = pMsg->pCont;
pLocalCmd->bytes = bytes;
pLocalCmd->msgType = TDMT_SYNC_LOCAL_CMD;
pLocalCmd->vgId = vgId;
return 0;
}
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex);
cJSON_AddStringToObject(pRoot, "fc-index", u64buf);
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
switch (timerType) {
case SYNC_TIMEOUT_PING:
return "ping";
case SYNC_TIMEOUT_ELECTION:
return "elect";
case SYNC_TIMEOUT_HEARTBEAT:
return "heartbeat";
default:
return "unknown";
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot);
return pJson;
}
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) {
cJSON* pJson = syncLocalCmd2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncLocalCmdPrint(const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncLocalCmdLog(const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncLocalCmd2Str(pMsg);
sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd) {
switch (cmd) {
case SYNC_LOCAL_CMD_STEP_DOWN:
return "step-down";
case SYNC_LOCAL_CMD_FOLLOWER_CMT:
return "follower-commit";
default:
return "unknown-local-cmd";
}
}
......@@ -17,6 +17,7 @@
#include "syncRaftLog.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
// log[m .. n]
......
......@@ -137,17 +137,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
return 0;
}
static void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, s);
}
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncAppendEntries* pMsg = pRpcMsg->pCont;
......
......@@ -88,23 +88,6 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
return false;
}
static void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
static void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncRequestVote* pMsg = pRpcMsg->pCont;
......
......@@ -38,14 +38,6 @@
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
//
static void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncRequestVoteReply* pMsg = pRpcMsg->pCont;
......
......@@ -17,6 +17,7 @@
#include "syncRespMgr.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr));
......
......@@ -19,12 +19,13 @@
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncReplication.h"
#include "syncUtil.h"
static void syncNodeCleanConfigIndex(SSyncNode* ths) {
int32_t newArrIndex = 0;
SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT] = {0};
SSnapshot snapshot = {0};
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex != SYNC_INDEX_INVALID) {
for (int32_t i = 0; i < ths->pRaftCfg->configIndexCount; ++i) {
......@@ -85,7 +86,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
return 0;
}
int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pRpc) {
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
int32_t ret = 0;
SyncTimeout* pMsg = pRpc->pCont;
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "syncUtil.h"
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
......@@ -345,3 +346,205 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
}
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
}
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
}
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
}
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
}
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
}
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
}
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
}
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->snapStart, s);
}
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->snapStart, s);
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
}
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d, len:%u}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq,
pMsg->dataLen, s);
}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen, s);
}
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, s);
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
\ No newline at end of file
......@@ -457,6 +457,24 @@ SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg);
char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg);
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId);
void syncLocalCmdDestroy(SyncLocalCmd* pMsg);
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen);
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg);
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len);
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len);
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg);
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg);
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg);
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg);
// for debug ----------------------
void syncLocalCmdPrint(const SyncLocalCmd* pMsg);
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg);
void syncLocalCmdLog(const SyncLocalCmd* pMsg);
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -2753,3 +2753,155 @@ char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) {
cJSON_Delete(pJson);
return serialized;
}
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncLocalCmd);
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
return pMsg;
}
void syncLocalCmdDestroy(SyncLocalCmd* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncLocalCmdSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncLocalCmdDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) {
syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex);
cJSON_AddStringToObject(pRoot, "fc-index", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot);
return pJson;
}
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) {
cJSON* pJson = syncLocalCmd2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncLocalCmdPrint(const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncLocalCmdLog(const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncLocalCmd2Str(pMsg);
sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册