diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2a3c07c2efd9ecbd0b17c8b62b0653d4bbcd0589..304d1bb695086233353363d414f03e5e8a6994e9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -239,20 +239,29 @@ void syncNodeClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); -// option -bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex); +// on message --------------------- +int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); +int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); +int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); +int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); + // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); - int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode); - int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); @@ -271,7 +280,6 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm); void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); - void syncNodeCandidate2Leader(SSyncNode* pSyncNode); void syncNodeFollower2Candidate(SSyncNode* pSyncNode); void syncNodeLeader2Follower(SSyncNode* pSyncNode); @@ -310,8 +318,7 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); -bool syncNodeCanChange(SSyncNode* pSyncNode); - +bool syncNodeCanChange(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); @@ -321,38 +328,6 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); bool syncNodeIsMnode(SSyncNode* pSyncNode); int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); -// trace log -void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); -void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); - -void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); -void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); - -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); -void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); - -void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); -void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); - -void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); -void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); - -void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); -void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); - -void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); -void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); - -void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); -void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); - -// syncUtil.h -void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...); -void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, - const char* format, ...); -void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, - const char* format, ...); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 8b98cf0e69a4dd2a2a852fb81c8a360d4f2593d3..92e7b555a44e5fda4d1012913dcc906f8d876f3c 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -224,8 +224,6 @@ typedef enum { SYNC_LOCAL_CMD_FOLLOWER_CMT, } ESyncLocalCmd; -const char* syncLocalCmdGetStr(int32_t cmd); - typedef struct SyncLocalCmd { uint32_t bytes; int32_t vgId; @@ -236,49 +234,8 @@ typedef struct SyncLocalCmd { int32_t cmd; SyncTerm sdNewTerm; // step down new term SyncIndex fcIndex; // follower commit index - } SyncLocalCmd; -SyncLocalCmd* syncLocalCmdBuild(int32_t vgId); -void syncLocalCmdDestroy(SyncLocalCmd* pMsg); -void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen); -void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg); -char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len); -SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len); -void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg); -void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg); -SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg); -char* syncLocalCmd2Str(const SyncLocalCmd* pMsg); - -// for debug ---------------------- -void syncLocalCmdPrint(const SyncLocalCmd* pMsg); -void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg); -void syncLocalCmdLog(const SyncLocalCmd* pMsg); -void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); - -// on message ---------------------- -int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg); - -int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); - -int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); -int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); - -// ----------------------------------------- - -// option ---------------------------------- -bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); -ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); - -const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); - int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode); int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seq, bool isWeak, int32_t vgId); int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId); @@ -294,6 +251,10 @@ int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId); + +const char* syncTimerTypeStr(ESyncTimeoutType timerType); +const char* syncLocalCmdGetStr(ESyncLocalCmd cmd); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 85c8e8f58f2e50a1a33265c4ebb9f431ccdabce5..66f6e6ee181201a75719cb164d15cfd1dcfb057b 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -34,7 +34,7 @@ extern "C" { // /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]] // /\ UNCHANGED <> // -int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 076101ef4365fc976e5da34a24b4c4e93f4cfda4..d920e98ab030419eb1dc6410b937abfd4f5725e8 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -48,6 +48,39 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, const char* format, ...); +void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); + +void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); +void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); + +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); + +void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); + +void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); +void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); + +void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); +void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); + +void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); +void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); + +void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); +void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); + +void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); +void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); + +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); +void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s); + +void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); +void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 0b6119457a5bfd1d507200472b22690fc1686208..4551702b34b1d75c3a7029a277badd6dbbc2081d 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -124,18 +124,6 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { return 0; } -void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen, s); -} - int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = pRpcMsg->pCont; SRpcMsg rpcRsp = {0}; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 6b2bf250ad59b8fa85c69373205d62e7d61dc9d3..94c4de3c5e75da0ae0ce1e87521bd85a95482884 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -15,12 +15,13 @@ #define _DEFAULT_SOURCE #include "syncAppendEntriesReply.h" -#include "syncMessage.h" #include "syncCommit.h" #include "syncIndexMgr.h" +#include "syncMessage.h" #include "syncRaftStore.h" #include "syncReplication.h" #include "syncSnapshot.h" +#include "syncUtil.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 9f0839fc0209b862a40771a8721feacb664118b6..5c41e43fd4e6c8baf02ccca2f7ebea06de950ff6 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -33,14 +33,6 @@ // mdest |-> j]) // /\ UNCHANGED <> -static void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host, - port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); -} - static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { if (pNode->state != TAOS_SYNC_STATE_CANDIDATE) { sNTrace(pNode, "not candidate, stop elect"); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 75d3b34f218723db3ea290994054a69b149b6e0d..f4cc8627f34bf798ef1cd6c25b53d7313a8cbf1f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -138,7 +138,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { code = syncNodeOnHeartbeatReply(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - code = syncNodeOnTimer(pSyncNode, pMsg); + code = syncNodeOnTimeout(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { @@ -154,9 +154,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { code = syncNodeOnSnapshotReply(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { - SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); - code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); - syncLocalCmdDestroy(pSyncMsg); + code = syncNodeOnLocalCmd(pSyncNode, pMsg); } else { sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); code = -1; @@ -1036,9 +1034,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode); } -// option -// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; } - ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; } // timer control -------------- @@ -2017,13 +2012,13 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { // syncNodeFollowerCommit(ths, pMsg->commitIndex); - SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId); + SRpcMsg rpcMsgLocalCmd = {0}; + (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); + + SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; pSyncMsg->fcIndex = pMsg->commitIndex; - SRpcMsg rpcMsgLocalCmd; - syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); - if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); if (code != 0) { @@ -2038,13 +2033,13 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { // syncNodeStepDown(ths, pMsg->term); - SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId); + SRpcMsg rpcMsgLocalCmd = {0}; + (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); + + SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; pSyncMsg->sdNewTerm = pMsg->term; - SRpcMsg rpcMsgLocalCmd; - syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); - if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); if (code != 0) { @@ -2054,8 +2049,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm); } } - - syncLocalCmdDestroy(pSyncMsg); } /* @@ -2079,7 +2072,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { return 0; } -int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { +int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + SyncLocalCmd* pMsg = pRpcMsg->pCont; syncLogRecvLocalCmd(ths, pMsg, ""); if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { @@ -2486,161 +2480,3 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { return true; } - -const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) { - if (timerType == SYNC_TIMEOUT_PING) { - return "ping"; - } else if (timerType == SYNC_TIMEOUT_ELECTION) { - return "elect"; - } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) { - return "heartbeat"; - } else { - return "unknown"; - } -} - -void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { - sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s", - syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); -} - -void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); -} - -void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); -} - -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); -} - -void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); -} - -void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->privateTerm, s); -} - -void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->privateTerm, s); -} - -void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { - sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, - syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); -} - -void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); -} - -void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); -} - -void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->snapStart, s); -} - -void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, - port, pMsg->term, pMsg->snapStart, s); -} - -void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", seq:%d}, %s", - host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); -} - -void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", seq:%d, len:%u}, %s", - host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, - pMsg->dataLen, s); -} - -void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", ack:%d}, %s", - host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); -} - -void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", ack:%d}, %s", - host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); -} diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 2b5b205f56cf2d5585ccf3ae429d5d4cca1482e1..ce984199803fa07512cb0aee57bc95126dd9f820 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -15,10 +15,7 @@ #define _DEFAULT_SOURCE #include "syncMessage.h" -#include "syncRaftCfg.h" #include "syncRaftEntry.h" -#include "syncUtil.h" -#include "tcoding.h" int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, SSyncNode* pNode) { @@ -189,6 +186,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } +#if 0 int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) { int32_t bytes = sizeof(SyncPreSnapshot); pMsg->pCont = rpcMallocCont(bytes); @@ -222,6 +220,7 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) { pPreSnapshotReply->vgId = vgId; return 0; } +#endif int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { int32_t bytes = sizeof(SyncSnapshotSend) + dataLen; @@ -275,164 +274,43 @@ int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) { return 0; } -const char* syncLocalCmdGetStr(int32_t cmd) { - if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) { - return "step-down"; - } else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { - return "follower-commit"; - } - - return "unknown-local-cmd"; -} - -SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) { - uint32_t bytes = sizeof(SyncLocalCmd); - SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) { + int32_t bytes = sizeof(SyncLocalCmd); + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_LOCAL_CMD; - return pMsg; -} - -void syncLocalCmdDestroy(SyncLocalCmd* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } -} - -void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} -void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncLocalCmdSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncLocalCmdDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) { - syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; + SyncLocalCmd* pLocalCmd = pMsg->pCont; + pLocalCmd->bytes = bytes; + pLocalCmd->msgType = TDMT_SYNC_LOCAL_CMD; + pLocalCmd->vgId = vgId; + return 0; } -cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { - char u64buf[128]; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); - cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex); - cJSON_AddStringToObject(pRoot, "fc-index", u64buf); +const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) { + switch (timerType) { + case SYNC_TIMEOUT_PING: + return "ping"; + case SYNC_TIMEOUT_ELECTION: + return "elect"; + case SYNC_TIMEOUT_HEARTBEAT: + return "heartbeat"; + default: + return "unknown"; } - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot); - return pJson; -} - -char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) { - cJSON* pJson = syncLocalCmd2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncLocalCmdPrint(const SyncLocalCmd* pMsg) { - char* serialized = syncLocalCmd2Str(pMsg); - printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) { - char* serialized = syncLocalCmd2Str(pMsg); - printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncLocalCmdLog(const SyncLocalCmd* pMsg) { - char* serialized = syncLocalCmd2Str(pMsg); - sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); } -void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncLocalCmd2Str(pMsg); - sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); +const char* syncLocalCmdGetStr(ESyncLocalCmd cmd) { + switch (cmd) { + case SYNC_LOCAL_CMD_STEP_DOWN: + return "step-down"; + case SYNC_LOCAL_CMD_FOLLOWER_CMT: + return "follower-commit"; + default: + return "unknown-local-cmd"; } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 2cdb18a1095d957564d30348c6e4b99fa8c6e3a0..7e4b18ab886fd8993a79ae06f9dab16d5b64d1cb 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -17,6 +17,7 @@ #include "syncRaftLog.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" +#include "syncUtil.h" // log[m .. n] diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 0f2057210c0245d6f11a9b98b9cbf9aa85e74427..f09bac21396cd0bb4a56c0fbac46176f605ea457 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -137,17 +137,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { return 0; } -static void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, - pMsg->dataLen, s); -} - int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncAppendEntries* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index e2cce286719270817c9d43352b65cffafd7f5717..8ffc22ee255db30c81a20e7fd06fc629b43e1da9 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -88,23 +88,6 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM return false; } -static void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { - char logBuf[256]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); -} - -static void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, - pMsg->voteGranted, s); -} - int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncRequestVote* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a9d363630ec30c82b99dd7bcb3e4fce678bbb960..563f475070a1ec63a3af73dcf9ed6205754b04b4 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -38,14 +38,6 @@ // /\ UNCHANGED <> // -static void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, - pMsg->voteGranted, s); -} - int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncRequestVoteReply* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index de8f1927ae4d4d1afe0ce7b744f35796d315157a..049b02d73e6a89344aa0266fc086cc69db943cb5 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -17,6 +17,7 @@ #include "syncRespMgr.h" #include "syncRaftEntry.h" #include "syncRaftStore.h" +#include "syncUtil.h" SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr)); diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 9bf2bb569759c13cf803f1abe30de9da30abd20b..3d4583aadbb59208137210ce59ac83046300d171 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -19,12 +19,13 @@ #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncReplication.h" +#include "syncUtil.h" static void syncNodeCleanConfigIndex(SSyncNode* ths) { int32_t newArrIndex = 0; SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT] = {0}; SSnapshot snapshot = {0}; - + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); if (snapshot.lastApplyIndex != SYNC_INDEX_INVALID) { for (int32_t i = 0; i < ths->pRaftCfg->configIndexCount; ++i) { @@ -85,7 +86,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { return 0; } -int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pRpc) { +int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) { int32_t ret = 0; SyncTimeout* pMsg = pRpc->pCont; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 894aa05aad0d2ead49cd672744eaf0f505d0af0e..a575de7e56f10cdf4f3dbda8d24bad858e350760 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncUtil.h" +#include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" #include "syncSnapshot.h" @@ -345,3 +346,205 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); } + +void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { + sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s", + syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); +} + +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { + sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, + syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); +} + +void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); +} + +void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); +} + +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); +} + +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); +} + +void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->privateTerm, s); +} + +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->privateTerm, s); +} + +void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); +} + +void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); +} + +void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->snapStart, s); +} + +void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, + port, pMsg->term, pMsg->snapStart, s); +} + +void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", seq:%d}, %s", + host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); +} + +void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", seq:%d, len:%u}, %s", + host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, + pMsg->dataLen, s); +} + +void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", ack:%d}, %s", + host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); +} + +void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", ack:%d}, %s", + host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); +} + +void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen, s); +} + +void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, + "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen, s); +} + +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { + char logBuf[256]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); +} + +void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host, + port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); +} + +void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, + pMsg->voteGranted, s); +} + +void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, + pMsg->voteGranted, s); +} \ No newline at end of file diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 4784587f4a2d77a0144e3062b72a1b9f35b4ff1d..dd67b6148e9f23bff48c38aa07cb75881ce0602b 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -457,6 +457,24 @@ SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg); cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg); char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg); +SyncLocalCmd* syncLocalCmdBuild(int32_t vgId); +void syncLocalCmdDestroy(SyncLocalCmd* pMsg); +void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen); +void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg); +char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len); +SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len); +void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg); +void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg); +SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg); +char* syncLocalCmd2Str(const SyncLocalCmd* pMsg); + +// for debug ---------------------- +void syncLocalCmdPrint(const SyncLocalCmd* pMsg); +void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg); +void syncLocalCmdLog(const SyncLocalCmd* pMsg); +void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index 5a32e809bfc077ae92255ae3da0c31387e3bd141..1ea7629601a4a7dc9846266e6652b3b4caf5b06a 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -2753,3 +2753,155 @@ char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) { cJSON_Delete(pJson); return serialized; } + +SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncLocalCmd); + SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_LOCAL_CMD; + return pMsg; +} + +void syncLocalCmdDestroy(SyncLocalCmd* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncLocalCmdSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncLocalCmdDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) { + syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); + cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex); + cJSON_AddStringToObject(pRoot, "fc-index", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot); + return pJson; +} + +char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) { + cJSON* pJson = syncLocalCmd2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncLocalCmdPrint(const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncLocalCmdLog(const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncLocalCmd2Str(pMsg); + sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} \ No newline at end of file