diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a27be95049d64b8ff68bc2f75fd62e49f32a47cd..0b997690a18e3d4befe7c0f50701eafdb6879843 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -58,7 +58,6 @@ typedef int64_t SyncIndex; typedef uint64_t SyncTerm; typedef struct SSyncNode SSyncNode; -typedef struct SSyncBuffer SSyncBuffer; typedef struct SWal SWal; typedef struct SSyncRaftEntry SSyncRaftEntry; diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 0a67939d356c80208caf431ca8fff5bbd7760328..a87a28baf524898937ad4838fd08d80d451f38e4 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -21,7 +21,6 @@ extern "C" { #endif #include "syncInt.h" -#include "syncMessage.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index d2dff92d43f31bc2c05c7e9d3a1a476ccd06ffbf..09750864d596932efa42dda30f92c58075f644ca 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -21,7 +21,6 @@ extern "C" { #endif #include "syncInt.h" -#include "syncMessage.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index bd88f5cdce270ac138d12fb89b4af4d4eb94ce29..79b4fa0fbf06be6d1c16e783ebc59638fff1a5b5 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -41,22 +41,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); -char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); -void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); -int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime); -int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); - -// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); -// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); - -// for debug ------------------- -void syncIndexMgrPrint(SSyncIndexMgr *pObj); -void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj); -void syncIndexMgrLog(SSyncIndexMgr *pObj); -void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj); +void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); +int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); +void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime); +int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); +void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); +SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8a951ba38d08d4fc5f7a117780ef6a1e218924a4..46bbb1442187d9bd5d08716d2f5a76792f1e48d7 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,6 @@ extern "C" { #endif #include "sync.h" -#include "syncTools.h" #include "taosdef.h" #include "tlog.h" #include "trpc.h" @@ -85,9 +84,33 @@ typedef struct SSyncSnapshotSender SSyncSnapshotSender; typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncTimer SSyncTimer; typedef struct SSyncHbTimerData SSyncHbTimerData; +typedef struct SyncSnapshotSend SyncSnapshotSend; +typedef struct SyncSnapshotRsp SyncSnapshotRsp; +typedef struct SyncLocalCmd SyncLocalCmd; +typedef struct SyncAppendEntriesBatch SyncAppendEntriesBatch; +typedef struct SyncPreSnapshotReply SyncPreSnapshotReply; +typedef struct SyncHeartbeatReply SyncHeartbeatReply; +typedef struct SyncHeartbeat SyncHeartbeat; +typedef struct SyncPreSnapshot SyncPreSnapshot; + +typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); +typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); +typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); +typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg); +typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg); +typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg); +typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg); +typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); +typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); extern bool gRaftDetailLog; +typedef struct SRaftId { + SyncNodeId addr; + SyncGroupId vgId; +} SRaftId; + typedef struct SSyncHbTimerData { SSyncNode* pSyncNode; SSyncTimer* pTimer; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 936081c7b2d901881fdfc977ac9f628bf126cca2..7dc04d0b86c8908e471cef89d120a5761662ae65 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -23,15 +23,666 @@ extern "C" { #include "syncInt.h" // --------------------------------------------- -cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); -cJSON* syncRpcUnknownMsg2Json(); -char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); +typedef struct SyncPing { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + uint32_t dataLen; + char data[]; +} SyncPing; + + +void syncPingDestroy(SyncPing* pMsg); +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); +SyncPing* syncPingDeserialize2(const char* buf, uint32_t len); +SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg); + +// --------------------------------------------- +typedef struct SyncPingReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + uint32_t dataLen; + char data[]; +} SyncPingReply; + +SyncPingReply* syncPingReplyBuild(uint32_t dataLen); +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); +void syncPingReplyDestroy(SyncPingReply* pMsg); +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); +char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len); +SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len); +int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen); +SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen); +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); +SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncPingReply2Json(const SyncPingReply* pMsg); +char* syncPingReply2Str(const SyncPingReply* pMsg); + +// for debug ---------------------- +void syncPingReplyPrint(const SyncPingReply* pMsg); +void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg); +void syncPingReplyLog(const SyncPingReply* pMsg); +void syncPingReplyLog2(char* s, const SyncPingReply* pMsg); + +// --------------------------------------------- +typedef enum ESyncTimeoutType { + SYNC_TIMEOUT_PING = 100, + SYNC_TIMEOUT_ELECTION, + SYNC_TIMEOUT_HEARTBEAT, +} ESyncTimeoutType; + +const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); + +typedef struct SyncTimeout { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + ESyncTimeoutType timeoutType; + uint64_t logicClock; + int32_t timerMS; + void* data; // need optimized +} SyncTimeout; + +SyncTimeout* syncTimeoutBuild(); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId, + void* data); +void syncTimeoutDestroy(SyncTimeout* pMsg); +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); +char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len); +SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len); +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); +SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncTimeout2Json(const SyncTimeout* pMsg); +char* syncTimeout2Str(const SyncTimeout* pMsg); + +// for debug ---------------------- +void syncTimeoutPrint(const SyncTimeout* pMsg); +void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg); +void syncTimeoutLog(const SyncTimeout* pMsg); +void syncTimeoutLog2(char* s, const SyncTimeout* pMsg); + +// --------------------------------------------- +typedef struct SyncClientRequest { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST + uint32_t originalRpcType; // origin RpcMsg msgType + uint64_t seqNum; + bool isWeak; + uint32_t dataLen; // origin RpcMsg.contLen + char data[]; // origin RpcMsg.pCont +} SyncClientRequest; + +SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen); +int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, + bool isWeak, int32_t vgId); +int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2 +void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); +char* syncClientRequest2Str(const SyncClientRequest* pMsg); + +// for debug ---------------------- +void syncClientRequestPrint(const SyncClientRequest* pMsg); +void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg); +void syncClientRequestLog(const SyncClientRequest* pMsg); +void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); + +// --------------------------------------------- +typedef struct SRaftMeta { + uint64_t seqNum; + bool isWeak; +} SRaftMeta; + +// block1: +// block2: SRaftMeta array +// block3: rpc msg array (with pCont pointer) + +typedef struct SyncClientRequestBatch { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH + uint32_t dataCount; + uint32_t dataLen; + char data[]; // block2, block3 +} SyncClientRequestBatch; + +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, + int32_t vgId); +void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); +void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg); +void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg); +SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg); +SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg); +SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg); +cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg); +char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg); + +// for debug ---------------------- +void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg); + +// --------------------------------------------- +typedef struct SyncClientRequestReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + int32_t errCode; + SRaftId leaderHint; +} SyncClientRequestReply; + +// --------------------------------------------- +typedef struct SyncRequestVote { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncTerm term; + SyncIndex lastLogIndex; + SyncTerm lastLogTerm; +} SyncRequestVote; + +SyncRequestVote* syncRequestVoteBuild(int32_t vgId); +void syncRequestVoteDestroy(SyncRequestVote* pMsg); +void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen); +void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg); +char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len); +SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len); +void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg); +void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg); +SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg); +char* syncRequestVote2Str(const SyncRequestVote* pMsg); + +// for debug ---------------------- +void syncRequestVotePrint(const SyncRequestVote* pMsg); +void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg); +void syncRequestVoteLog(const SyncRequestVote* pMsg); +void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg); + +// --------------------------------------------- +typedef struct SyncRequestVoteReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncTerm term; + bool voteGranted; +} SyncRequestVoteReply; + +SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId); +void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg); +void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen); +void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg); +char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len); +SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len); +void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg); +void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg); +SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg); +char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg); + +// for debug ---------------------- +void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg); + +// --------------------------------------------- +// data: entry + +typedef struct SyncAppendEntries { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncIndex prevLogIndex; + SyncTerm prevLogTerm; + SyncIndex commitIndex; + SyncTerm privateTerm; + uint32_t dataLen; + char data[]; +} SyncAppendEntries; + +SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId); +void syncAppendEntriesDestroy(SyncAppendEntries* pMsg); +void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg); +char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len); +SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len); +void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg); +SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg); +char* syncAppendEntries2Str(const SyncAppendEntries* pMsg); + +// for debug ---------------------- +void syncAppendEntriesPrint(const SyncAppendEntries* pMsg); +void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg); +void syncAppendEntriesLog(const SyncAppendEntries* pMsg); +void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); + +// --------------------------------------------- + +typedef struct SOffsetAndContLen { + int32_t offset; + int32_t contLen; +} SOffsetAndContLen; + +// data: +// block1: SOffsetAndContLen Array +// block2: entry Array + +typedef struct SyncAppendEntriesBatch { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncIndex prevLogIndex; + SyncTerm prevLogTerm; + SyncIndex commitIndex; + SyncTerm privateTerm; + int32_t dataCount; + uint32_t dataLen; + char data[]; // block1, block2 +} SyncAppendEntriesBatch; + +SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId); +SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg); +char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len); +SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len); +void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg); +SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg); + +// --------------------------------------------- +typedef struct SyncAppendEntriesReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncTerm term; + SyncTerm privateTerm; + bool success; + SyncIndex matchIndex; + SyncIndex lastSendIndex; + int64_t startTime; +} SyncAppendEntriesReply; + +SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId); +void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg); +char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len); +SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len); +void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg); +SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg); +char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg); + +// for debug ---------------------- +void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg); + +// --------------------------------------------- +typedef struct SyncHeartbeat { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncIndex commitIndex; + SyncTerm privateTerm; + SyncTerm minMatchIndex; + +} SyncHeartbeat; + +SyncHeartbeat* syncHeartbeatBuild(int32_t vgId); +void syncHeartbeatDestroy(SyncHeartbeat* pMsg); +void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen); +void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg); +char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len); +SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len); +void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg); +void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg); +SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg); +char* syncHeartbeat2Str(const SyncHeartbeat* pMsg); + +// for debug ---------------------- +void syncHeartbeatPrint(const SyncHeartbeat* pMsg); +void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg); +void syncHeartbeatLog(const SyncHeartbeat* pMsg); +void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg); + +// --------------------------------------------- +typedef struct SyncHeartbeatReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncTerm privateTerm; + int64_t startTime; +} SyncHeartbeatReply; + +SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId); +void syncHeartbeatReplyDestroy(SyncHeartbeatReply* pMsg); +void syncHeartbeatReplySerialize(const SyncHeartbeatReply* pMsg, char* buf, uint32_t bufLen); +void syncHeartbeatReplyDeserialize(const char* buf, uint32_t len, SyncHeartbeatReply* pMsg); +char* syncHeartbeatReplySerialize2(const SyncHeartbeatReply* pMsg, uint32_t* len); +SyncHeartbeatReply* syncHeartbeatReplyDeserialize2(const char* buf, uint32_t len); +void syncHeartbeatReply2RpcMsg(const SyncHeartbeatReply* pMsg, SRpcMsg* pRpcMsg); +void syncHeartbeatReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeatReply* pMsg); +SyncHeartbeatReply* syncHeartbeatReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncHeartbeatReply2Json(const SyncHeartbeatReply* pMsg); +char* syncHeartbeatReply2Str(const SyncHeartbeatReply* pMsg); + +// for debug ---------------------- +void syncHeartbeatReplyPrint(const SyncHeartbeatReply* pMsg); +void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg); +void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg); +void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg); + +// --------------------------------------------- +typedef struct SyncPreSnapshot { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + +} SyncPreSnapshot; + +SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId); +void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg); +void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen); +void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg); +char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len); +SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len); +void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg); +void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg); +SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg); +char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg); + +// for debug ---------------------- +void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg); +void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg); +void syncPreSnapshotLog(const SyncPreSnapshot* pMsg); +void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg); + +// --------------------------------------------- +typedef struct SyncPreSnapshotReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncIndex snapStart; + +} SyncPreSnapshotReply; + +SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId); +void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen); +void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg); +char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len); +SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len); +void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg); +void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg); +SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg); +char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg); + +// for debug ---------------------- +void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg); + +// --------------------------------------------- +typedef struct SyncApplyMsg { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; // user SyncApplyMsg msgType + uint32_t originalRpcType; // user RpcMsg msgType + SFsmCbMeta fsmMeta; + uint32_t dataLen; // user RpcMsg.contLen + char data[]; // user RpcMsg.pCont +} SyncApplyMsg; + +SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen); +SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta); +void syncApplyMsgDestroy(SyncApplyMsg* pMsg); +void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen); +void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg); +char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len); +SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len); +void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ +void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg +SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg); +void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg +cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg); +char* syncApplyMsg2Str(const SyncApplyMsg* pMsg); + +// for debug ---------------------- +void syncApplyMsgPrint(const SyncApplyMsg* pMsg); +void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg); +void syncApplyMsgLog(const SyncApplyMsg* pMsg); +void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg); + +// --------------------------------------------- +typedef struct SyncSnapshotSend { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + SyncTerm term; + SyncIndex beginIndex; // snapshot.beginIndex + SyncIndex lastIndex; // snapshot.lastIndex + SyncTerm lastTerm; // snapshot.lastTerm + SyncIndex lastConfigIndex; // snapshot.lastConfigIndex + SSyncCfg lastConfig; + int64_t startTime; + int32_t seq; + uint32_t dataLen; + char data[]; +} SyncSnapshotSend; + +SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId); +void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg); +void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen); +void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg); +char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len); +SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len); +void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg); +void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg); +SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg); +char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg); + +// for debug ---------------------- +void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg); +void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg); +void syncSnapshotSendLog(const SyncSnapshotSend* pMsg); +void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg); + +// --------------------------------------------- +typedef struct SyncSnapshotRsp { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + SyncTerm term; + SyncIndex lastIndex; + SyncTerm lastTerm; + int64_t startTime; + int32_t ack; + int32_t code; + SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid +} SyncSnapshotRsp; + +SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId); +void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg); +void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen); +void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg); +char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len); +SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len); +void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg); +void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg); +SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg); +char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg); + +// for debug ---------------------- +void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg); +void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg); +void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg); +void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg); + +// --------------------------------------------- +typedef struct SyncLeaderTransfer { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + /* + SRaftId srcId; + SRaftId destId; + */ + SNodeInfo newNodeInfo; + SRaftId newLeaderId; +} SyncLeaderTransfer; + +SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId); +void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg); +void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen); +void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg); +char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len); +SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len); +void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg); +void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg); +SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg); +char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg); + +typedef enum { + SYNC_LOCAL_CMD_STEP_DOWN = 100, + SYNC_LOCAL_CMD_FOLLOWER_CMT, +} ESyncLocalCmd; + +const char* syncLocalCmdGetStr(int32_t cmd); + +typedef struct SyncLocalCmd { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + int32_t cmd; + SyncTerm sdNewTerm; // step down new term + SyncIndex fcIndex;// follower commit index + +} SyncLocalCmd; + +SyncLocalCmd* syncLocalCmdBuild(int32_t vgId); +void syncLocalCmdDestroy(SyncLocalCmd* pMsg); +void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen); +void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg); +char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len); +SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len); +void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg); +void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg); +SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg); +char* syncLocalCmd2Str(const SyncLocalCmd* pMsg); // for debug ---------------------- -void syncRpcMsgPrint(SRpcMsg* pMsg); -void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); -void syncRpcMsgLog(SRpcMsg* pMsg); -void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); +void syncLocalCmdPrint(const SyncLocalCmd* pMsg); +void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg); +void syncLocalCmdLog(const SyncLocalCmd* pMsg); +void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); + +// on message ---------------------- +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); + +int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); + +int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + +int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); +int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg); + +int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); +int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); + +int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); +int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); + +int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); +int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); + +// ----------------------------------------- + +// option ---------------------------------- +bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); +ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); + // --------------------------------------------- #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h deleted file mode 100644 index 3fb4a5ba0c6f3283c1ee602e7a84f3a8bc4aa25b..0000000000000000000000000000000000000000 --- a/source/libs/sync/inc/syncTools.h +++ /dev/null @@ -1,755 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_TOOLS_H -#define _TD_LIBS_SYNC_TOOLS_H - -#ifdef __cplusplus -extern "C" { -#endif - -// ------------------ ds ------------------- -typedef struct SRaftId { - SyncNodeId addr; - SyncGroupId vgId; -} SRaftId; - -// ------------------ for debug ------------------- -void syncRpcMsgPrint(SRpcMsg* pMsg); -void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); -void syncRpcMsgLog(SRpcMsg* pMsg); -void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); - -// ------------------ for compile ------------------- -typedef struct SSyncBuffer { - void* data; - size_t len; -} SSyncBuffer; - -typedef struct SNodesRole { - int32_t replicaNum; - SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; - ESyncState role[TSDB_MAX_REPLICA]; -} SNodesRole; - -typedef struct SStateMgr { - void* data; - - int32_t (*getCurrentTerm)(struct SStateMgr* pMgr, SyncTerm* pCurrentTerm); - int32_t (*persistCurrentTerm)(struct SStateMgr* pMgr, SyncTerm pCurrentTerm); - - int32_t (*getVoteFor)(struct SStateMgr* pMgr, SyncNodeId* pVoteFor); - int32_t (*persistVoteFor)(struct SStateMgr* pMgr, SyncNodeId voteFor); - - int32_t (*getSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg); - int32_t (*persistSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg); - -} SStateMgr; - -// ------------------ for message process ------------------- - -// --------------------------------------------- -typedef struct SyncPing { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - // private data - uint32_t dataLen; - char data[]; -} SyncPing; - -SyncPing* syncPingBuild(uint32_t dataLen); -SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); -SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); -void syncPingDestroy(SyncPing* pMsg); -void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); -char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len); -SyncPing* syncPingDeserialize2(const char* buf, uint32_t len); -int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen); -SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen); -void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); -void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); -SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncPing2Json(const SyncPing* pMsg); -char* syncPing2Str(const SyncPing* pMsg); - -// for debug ---------------------- -void syncPingPrint(const SyncPing* pMsg); -void syncPingPrint2(char* s, const SyncPing* pMsg); -void syncPingLog(const SyncPing* pMsg); -void syncPingLog2(char* s, const SyncPing* pMsg); - -// --------------------------------------------- -typedef struct SyncPingReply { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - // private data - uint32_t dataLen; - char data[]; -} SyncPingReply; - -SyncPingReply* syncPingReplyBuild(uint32_t dataLen); -SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); -SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); -void syncPingReplyDestroy(SyncPingReply* pMsg); -void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); -void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); -char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len); -SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len); -int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen); -SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen); -void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); -void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); -SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncPingReply2Json(const SyncPingReply* pMsg); -char* syncPingReply2Str(const SyncPingReply* pMsg); - -// for debug ---------------------- -void syncPingReplyPrint(const SyncPingReply* pMsg); -void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg); -void syncPingReplyLog(const SyncPingReply* pMsg); -void syncPingReplyLog2(char* s, const SyncPingReply* pMsg); - -// --------------------------------------------- -typedef enum ESyncTimeoutType { - SYNC_TIMEOUT_PING = 100, - SYNC_TIMEOUT_ELECTION, - SYNC_TIMEOUT_HEARTBEAT, -} ESyncTimeoutType; - -const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); - -typedef struct SyncTimeout { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - ESyncTimeoutType timeoutType; - uint64_t logicClock; - int32_t timerMS; - void* data; // need optimized -} SyncTimeout; - -SyncTimeout* syncTimeoutBuild(); -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId, - void* data); -void syncTimeoutDestroy(SyncTimeout* pMsg); -void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); -void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); -char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len); -SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len); -void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); -void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); -SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncTimeout2Json(const SyncTimeout* pMsg); -char* syncTimeout2Str(const SyncTimeout* pMsg); - -// for debug ---------------------- -void syncTimeoutPrint(const SyncTimeout* pMsg); -void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg); -void syncTimeoutLog(const SyncTimeout* pMsg); -void syncTimeoutLog2(char* s, const SyncTimeout* pMsg); - -// --------------------------------------------- -typedef struct SyncClientRequest { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST - uint32_t originalRpcType; // origin RpcMsg msgType - uint64_t seqNum; - bool isWeak; - uint32_t dataLen; // origin RpcMsg.contLen - char data[]; // origin RpcMsg.pCont -} SyncClientRequest; - -SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen); -int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, - bool isWeak, int32_t vgId); -int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); -void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2 -void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); -cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); -char* syncClientRequest2Str(const SyncClientRequest* pMsg); - -// for debug ---------------------- -void syncClientRequestPrint(const SyncClientRequest* pMsg); -void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg); -void syncClientRequestLog(const SyncClientRequest* pMsg); -void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); - -// --------------------------------------------- -typedef struct SRaftMeta { - uint64_t seqNum; - bool isWeak; -} SRaftMeta; - -// block1: -// block2: SRaftMeta array -// block3: rpc msg array (with pCont pointer) - -typedef struct SyncClientRequestBatch { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH - uint32_t dataCount; - uint32_t dataLen; - char data[]; // block2, block3 -} SyncClientRequestBatch; - -SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, - int32_t vgId); -void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); -void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg); -void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg); -SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg); -SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg); -SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg); -cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg); -char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg); - -// for debug ---------------------- -void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg); -void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg); -void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg); -void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg); - -// --------------------------------------------- -typedef struct SyncClientRequestReply { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - int32_t errCode; - SRaftId leaderHint; -} SyncClientRequestReply; - -// --------------------------------------------- -typedef struct SyncRequestVote { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - // private data - SyncTerm term; - SyncIndex lastLogIndex; - SyncTerm lastLogTerm; -} SyncRequestVote; - -SyncRequestVote* syncRequestVoteBuild(int32_t vgId); -void syncRequestVoteDestroy(SyncRequestVote* pMsg); -void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen); -void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg); -char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len); -SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len); -void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg); -void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg); -SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg); -char* syncRequestVote2Str(const SyncRequestVote* pMsg); - -// for debug ---------------------- -void syncRequestVotePrint(const SyncRequestVote* pMsg); -void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg); -void syncRequestVoteLog(const SyncRequestVote* pMsg); -void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg); - -// --------------------------------------------- -typedef struct SyncRequestVoteReply { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - // private data - SyncTerm term; - bool voteGranted; -} SyncRequestVoteReply; - -SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId); -void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg); -void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen); -void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg); -char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len); -SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len); -void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg); -void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg); -SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg); -char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg); - -// for debug ---------------------- -void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg); -void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg); -void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg); -void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg); - -// --------------------------------------------- -// data: entry - -typedef struct SyncAppendEntries { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - // private data - SyncTerm term; - SyncIndex prevLogIndex; - SyncTerm prevLogTerm; - SyncIndex commitIndex; - SyncTerm privateTerm; - uint32_t dataLen; - char data[]; -} SyncAppendEntries; - -SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId); -void syncAppendEntriesDestroy(SyncAppendEntries* pMsg); -void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen); -void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg); -char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len); -SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len); -void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg); -void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg); -SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg); -char* syncAppendEntries2Str(const SyncAppendEntries* pMsg); - -// for debug ---------------------- -void syncAppendEntriesPrint(const SyncAppendEntries* pMsg); -void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg); -void syncAppendEntriesLog(const SyncAppendEntries* pMsg); -void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); - -// --------------------------------------------- - -typedef struct SOffsetAndContLen { - int32_t offset; - int32_t contLen; -} SOffsetAndContLen; - -// data: -// block1: SOffsetAndContLen Array -// block2: entry Array - -typedef struct SyncAppendEntriesBatch { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - // private data - SyncTerm term; - SyncIndex prevLogIndex; - SyncTerm prevLogTerm; - SyncIndex commitIndex; - SyncTerm privateTerm; - int32_t dataCount; - uint32_t dataLen; - char data[]; // block1, block2 -} SyncAppendEntriesBatch; - -SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId); -SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg); -void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg); -void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen); -void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg); -char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len); -SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len); -void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg); -void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg); -SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg); - -// --------------------------------------------- -typedef struct SyncAppendEntriesReply { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - // private data - SyncTerm term; - SyncTerm privateTerm; - bool success; - SyncIndex matchIndex; - SyncIndex lastSendIndex; - int64_t startTime; -} SyncAppendEntriesReply; - -SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId); -void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg); -void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen); -void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg); -char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len); -SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len); -void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg); -void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg); -SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg); -char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg); - -// for debug ---------------------- -void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg); -void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg); -void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg); -void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg); - -// --------------------------------------------- -typedef struct SyncHeartbeat { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - // private data - SyncTerm term; - SyncIndex commitIndex; - SyncTerm privateTerm; - SyncTerm minMatchIndex; - -} SyncHeartbeat; - -SyncHeartbeat* syncHeartbeatBuild(int32_t vgId); -void syncHeartbeatDestroy(SyncHeartbeat* pMsg); -void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen); -void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg); -char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len); -SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len); -void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg); -void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg); -SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg); -char* syncHeartbeat2Str(const SyncHeartbeat* pMsg); - -// for debug ---------------------- -void syncHeartbeatPrint(const SyncHeartbeat* pMsg); -void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg); -void syncHeartbeatLog(const SyncHeartbeat* pMsg); -void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg); - -// --------------------------------------------- -typedef struct SyncHeartbeatReply { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - // private data - SyncTerm term; - SyncTerm privateTerm; - int64_t startTime; -} SyncHeartbeatReply; - -SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId); -void syncHeartbeatReplyDestroy(SyncHeartbeatReply* pMsg); -void syncHeartbeatReplySerialize(const SyncHeartbeatReply* pMsg, char* buf, uint32_t bufLen); -void syncHeartbeatReplyDeserialize(const char* buf, uint32_t len, SyncHeartbeatReply* pMsg); -char* syncHeartbeatReplySerialize2(const SyncHeartbeatReply* pMsg, uint32_t* len); -SyncHeartbeatReply* syncHeartbeatReplyDeserialize2(const char* buf, uint32_t len); -void syncHeartbeatReply2RpcMsg(const SyncHeartbeatReply* pMsg, SRpcMsg* pRpcMsg); -void syncHeartbeatReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeatReply* pMsg); -SyncHeartbeatReply* syncHeartbeatReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncHeartbeatReply2Json(const SyncHeartbeatReply* pMsg); -char* syncHeartbeatReply2Str(const SyncHeartbeatReply* pMsg); - -// for debug ---------------------- -void syncHeartbeatReplyPrint(const SyncHeartbeatReply* pMsg); -void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg); -void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg); -void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg); - -// --------------------------------------------- -typedef struct SyncPreSnapshot { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - // private data - SyncTerm term; - -} SyncPreSnapshot; - -SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId); -void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg); -void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen); -void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg); -char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len); -SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len); -void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg); -void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg); -SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg); -char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg); - -// for debug ---------------------- -void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg); -void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg); -void syncPreSnapshotLog(const SyncPreSnapshot* pMsg); -void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg); - -// --------------------------------------------- -typedef struct SyncPreSnapshotReply { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - // private data - SyncTerm term; - SyncIndex snapStart; - -} SyncPreSnapshotReply; - -SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId); -void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg); -void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen); -void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg); -char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len); -SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len); -void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg); -void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg); -SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg); -char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg); - -// for debug ---------------------- -void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg); -void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg); -void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg); -void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg); - -// --------------------------------------------- -typedef struct SyncApplyMsg { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; // user SyncApplyMsg msgType - uint32_t originalRpcType; // user RpcMsg msgType - SFsmCbMeta fsmMeta; - uint32_t dataLen; // user RpcMsg.contLen - char data[]; // user RpcMsg.pCont -} SyncApplyMsg; - -SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen); -SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta); -void syncApplyMsgDestroy(SyncApplyMsg* pMsg); -void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen); -void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg); -char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len); -SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len); -void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ -void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg -SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg); -void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg -cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg); -char* syncApplyMsg2Str(const SyncApplyMsg* pMsg); - -// for debug ---------------------- -void syncApplyMsgPrint(const SyncApplyMsg* pMsg); -void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg); -void syncApplyMsgLog(const SyncApplyMsg* pMsg); -void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg); - -// --------------------------------------------- -typedef struct SyncSnapshotSend { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - SyncTerm term; - SyncIndex beginIndex; // snapshot.beginIndex - SyncIndex lastIndex; // snapshot.lastIndex - SyncTerm lastTerm; // snapshot.lastTerm - SyncIndex lastConfigIndex; // snapshot.lastConfigIndex - SSyncCfg lastConfig; - int64_t startTime; - int32_t seq; - uint32_t dataLen; - char data[]; -} SyncSnapshotSend; - -SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId); -void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg); -void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen); -void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg); -char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len); -SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len); -void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg); -void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg); -SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg); -char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg); - -// for debug ---------------------- -void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg); -void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg); -void syncSnapshotSendLog(const SyncSnapshotSend* pMsg); -void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg); - -// --------------------------------------------- -typedef struct SyncSnapshotRsp { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - SyncTerm term; - SyncIndex lastIndex; - SyncTerm lastTerm; - int64_t startTime; - int32_t ack; - int32_t code; - SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid -} SyncSnapshotRsp; - -SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId); -void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg); -void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen); -void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg); -char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len); -SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len); -void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg); -void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg); -SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg); -char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg); - -// for debug ---------------------- -void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg); -void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg); -void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg); -void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg); - -// --------------------------------------------- -typedef struct SyncLeaderTransfer { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - /* - SRaftId srcId; - SRaftId destId; - */ - SNodeInfo newNodeInfo; - SRaftId newLeaderId; -} SyncLeaderTransfer; - -SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId); -void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg); -void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen); -void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg); -char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len); -SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len); -void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg); -void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg); -SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg); -char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg); - -typedef enum { - SYNC_LOCAL_CMD_STEP_DOWN = 100, - SYNC_LOCAL_CMD_FOLLOWER_CMT, -} ESyncLocalCmd; - -const char* syncLocalCmdGetStr(int32_t cmd); - -typedef struct SyncLocalCmd { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - int32_t cmd; - SyncTerm sdNewTerm; // step down new term - SyncIndex fcIndex;// follower commit index - -} SyncLocalCmd; - -SyncLocalCmd* syncLocalCmdBuild(int32_t vgId); -void syncLocalCmdDestroy(SyncLocalCmd* pMsg); -void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen); -void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg); -char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len); -SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len); -void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg); -void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg); -SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg); -char* syncLocalCmd2Str(const SyncLocalCmd* pMsg); - -// for debug ---------------------- -void syncLocalCmdPrint(const SyncLocalCmd* pMsg); -void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg); -void syncLocalCmdLog(const SyncLocalCmd* pMsg); -void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); - -// on message ---------------------- -int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); -int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); - -int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); -int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); - -int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); - -int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); -int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg); - -int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); -int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); - -int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); -int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); - -int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); -int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); - -// ----------------------------------------- -typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); -typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); -typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); -typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg); -typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg); -typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg); -typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); -typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg); -typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); -typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); - -// option ---------------------------------- -bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); -ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); - -// --------------------------------------------- - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_LIBS_SYNC_TOOLS_H*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 792ce67cd4c9be0eef09fe6b11ba0857ffb92f1d..c9c1baa4bc4fa37b69ec8d4cbb65c271494385ae 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -13,15 +13,11 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncAppendEntries.h" -#include "syncInt.h" -#include "syncRaftCfg.h" +#include "syncMessage.h" #include "syncRaftLog.h" #include "syncRaftStore.h" -#include "syncSnapshot.h" -#include "syncUtil.h" -#include "syncVoteMgr.h" -#include "wal.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index cf7c391a1d6c69b358b193666c7edc2d7f434fab..53d6b5d92f672f2ea7a1dd653be440c140318005 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -13,17 +13,14 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncAppendEntriesReply.h" +#include "syncMessage.h" #include "syncCommit.h" #include "syncIndexMgr.h" -#include "syncInt.h" -#include "syncRaftCfg.h" -#include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncReplication.h" #include "syncSnapshot.h" -#include "syncUtil.h" -#include "syncVoteMgr.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index a951b78e1ea44cbcad16e12739c6006e33d2c7e1..d2320fc6beec2a4fba966f06a0471e168f4cdf49 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -13,10 +13,9 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncCommit.h" #include "syncIndexMgr.h" -#include "syncInt.h" -#include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 85e45728775b57b674a2f11a6feb72580e9f279f..123ce5b581447c7fe05037ededbca8d212a29e5c 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -18,8 +18,8 @@ #include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" -#include "syncVoteMgr.h" #include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // RequestVote(i, j) == diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 3f3b794f46cb5c3b82b7d5476f400b71aa066c8e..a0e0a5a2c2df3ee4bbc593d11f3ec931fded140a 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -105,6 +105,7 @@ void syncEnvStopTimer() { #endif static void syncEnvTick(void *param, void *tmrId) { +#if 0 SSyncEnv *pSyncEnv = param; if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) { gSyncEnv.envTickTimerCounter++; @@ -121,4 +122,5 @@ static void syncEnvTick(void *param, void *tmrId) { gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter, gSyncEnv.envTickTimerMS, tmrId); } +#endif } diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 8e78aeedc335c1368dc62435cf21dbb205316e11..ca5e531528afd76fbbb68e61cbe3224e337b89a6 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -13,18 +13,16 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncIndexMgr.h" #include "syncUtil.h" -// SMatchIndex ----------------------------- - SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { - SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr)); + SSyncIndexMgr *pSyncIndexMgr = taosMemoryCalloc(1, sizeof(SSyncIndexMgr)); if (pSyncIndexMgr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr)); pSyncIndexMgr->replicas = &(pSyncNode->replicasId); pSyncIndexMgr->replicaNum = pSyncNode->replicaNum; @@ -97,54 +95,6 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf return SYNC_INDEX_INVALID; } -cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { - char u64buf[128] = {0}; - cJSON *pRoot = cJSON_CreateObject(); - - if (pSyncIndexMgr != NULL) { - cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum); - cJSON *pReplicas = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { - cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i])); - } - - { - int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum); - for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { - arr[i] = pSyncIndexMgr->index[i]; - } - cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); - taosMemoryFree(arr); - cJSON_AddItemToObject(pRoot, "index", pIndex); - } - - { - int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum); - for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { - arr[i] = pSyncIndexMgr->privateTerm[i]; - } - cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); - taosMemoryFree(arr); - cJSON_AddItemToObject(pRoot, "privateTerm", pIndex); - } - - snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - } - - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot); - return pJson; -} - -char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { - cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char *serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) { for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { @@ -201,35 +151,6 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa return -1; } -// for debug ------------------- -void syncIndexMgrPrint(SSyncIndexMgr *pObj) { - char *serialized = syncIndexMgr2Str(pObj); - printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) { - char *serialized = syncIndexMgr2Str(pObj); - printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncIndexMgrLog(SSyncIndexMgr *pObj) { - char *serialized = syncIndexMgr2Str(pObj); - sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) { - if (gRaftDetailLog) { - char *serialized = syncIndexMgr2Str(pObj); - sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } -} - void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) { for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3fcb563f3bced428398525c189fc2ead28b57dec..565b21eb555876cbab0d3f8353695298a887e1be 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftEntry.h" @@ -150,31 +151,6 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) { } } -// ---- message process SyncPing---- -SyncPing* syncPingBuild(uint32_t dataLen) { - uint32_t bytes = sizeof(SyncPing) + dataLen; - SyncPing* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->msgType = TDMT_SYNC_PING; - pMsg->dataLen = dataLen; - return pMsg; -} - -SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) { - uint32_t dataLen = strlen(str) + 1; - SyncPing* pMsg = syncPingBuild(dataLen); - pMsg->vgId = vgId; - pMsg->srcId = *srcId; - pMsg->destId = *destId; - snprintf(pMsg->data, pMsg->dataLen, "%s", str); - return pMsg; -} - -SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) { - SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping"); - return pMsg; -} void syncPingDestroy(SyncPing* pMsg) { if (pMsg != NULL) { @@ -193,16 +169,6 @@ void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); } -char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncPingSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncPing* pMsg = taosMemoryMalloc(bytes); @@ -212,117 +178,6 @@ SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { return pMsg; } -int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) { - return -1; - } - - if (tEncodeU32(&encoder, pMsg->bytes) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->vgId) < 0) { - return -1; - } - if (tEncodeU32(&encoder, pMsg->msgType) < 0) { - return -1; - } - if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) { - return -1; - } - if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) { - return -1; - } - if (tEncodeU32(&encoder, pMsg->dataLen) < 0) { - return -1; - } - if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) { - return -1; - } - - tEndEncode(&encoder); - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; -} - -SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) { - return NULL; - } - - SyncPing* pMsg = NULL; - uint32_t bytes; - if (tDecodeU32(&decoder, &bytes) < 0) { - return NULL; - } - - pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - pMsg->bytes = bytes; - - if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - uint32_t len; - char* data = NULL; - if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - ASSERT(len == pMsg->dataLen); - memcpy(pMsg->data, data, len); - - tEndDecode(&decoder); - tDecoderClear(&decoder); - return pMsg; -} - -void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { - syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); @@ -330,96 +185,6 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { return pMsg; } -cJSON* syncPing2Json(const SyncPing* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - taosMemoryFree(s); - } - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncPing", pRoot); - return pJson; -} - -char* syncPing2Str(const SyncPing* pMsg) { - cJSON* pJson = syncPing2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncPingPrint(const SyncPing* pMsg) { - char* serialized = syncPing2Str(pMsg); - printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPingPrint2(char* s, const SyncPing* pMsg) { - char* serialized = syncPing2Str(pMsg); - printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPingLog(const SyncPing* pMsg) { - char* serialized = syncPing2Str(pMsg); - sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncPingLog2(char* s, const SyncPing* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncPing2Str(pMsg); - sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } -} - // ---- message process SyncPingReply---- SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { uint32_t bytes = sizeof(SyncPingReply) + dataLen; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index f2b75def6b58425e68d78a4d1f02cf0b69a887c3..bf44341acd16bfbef42351d911fe1ee44aad9a44 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncRequestVote.h" +#include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" #include "syncUtil.h" diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 02b9bb40acbe2ab8c98a677a1c80c00ab2940b20..1acf16507aad7b8b8bafd438b288dd56ad7f0f91 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncRequestVoteReply.h" +#include "syncMessage.h" #include "syncRaftStore.h" #include "syncVoteMgr.h" diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 4ca4e26becfad520702711636846fa353b4ab55e..8a0a35ce336bfa9d1ed1d01fb96b79a98da24edb 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncVoteMgr.h" +#include "syncMessage.h" #include "syncUtil.h" static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index fb227dfad34d7bfcdcea58806ce7ab00317d3bcd..c29def9ca33849a2aee27f859fb2e2d257cdb32a 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -143,7 +143,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 + sTrace("==callback== ==ReConfigCb== flag:%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term); } diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 50d9d12eeed947cf2fefda1966e5b4abc188c003..93da2df5a20b8cda6c8eab60cb3566bcaef596af 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -110,7 +110,37 @@ char* snapshotSender2Str(SSyncSnapshotSender* pSender); cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver); char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver); - +cJSON* syncIndexMgr2Json(SSyncIndexMgr* pSyncIndexMgr); +char* syncIndexMgr2Str(SSyncIndexMgr* pSyncIndexMgr); +void syncIndexMgrPrint(SSyncIndexMgr* pObj); +void syncIndexMgrPrint2(char* s, SSyncIndexMgr* pObj); +void syncIndexMgrLog(SSyncIndexMgr* pObj); +void syncIndexMgrLog2(char* s, SSyncIndexMgr* pObj); + +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); +cJSON* syncRpcUnknownMsg2Json(); +char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); +void syncRpcMsgPrint(SRpcMsg* pMsg); +void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); +void syncRpcMsgLog(SRpcMsg* pMsg); +void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); + + +// origin syncMessage +SyncPing* syncPingBuild(uint32_t dataLen); +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); +char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len); +int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen); +SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen); +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); +cJSON* syncPing2Json(const SyncPing* pMsg); +char* syncPing2Str(const SyncPing* pMsg); +void syncPingPrint(const SyncPing* pMsg); +void syncPingPrint2(char* s, const SyncPing* pMsg); +void syncPingLog(const SyncPing* pMsg); +void syncPingLog2(char* s, const SyncPing* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c b/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c new file mode 100644 index 0000000000000000000000000000000000000000..1d3198c51d285f7a3de97e1c59b0676e1b15142c --- /dev/null +++ b/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "syncTest.h" + +void syncIndexMgrPrint(SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncIndexMgrLog(SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) { + if (gRaftDetailLog) { + char *serialized = syncIndexMgr2Str(pObj); + sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + +cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { + char u64buf[128] = {0}; + cJSON *pRoot = cJSON_CreateObject(); + + if (pSyncIndexMgr != NULL) { + cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum); + cJSON *pReplicas = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicas", pReplicas); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i])); + } + + { + int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + arr[i] = pSyncIndexMgr->index[i]; + } + cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); + taosMemoryFree(arr); + cJSON_AddItemToObject(pRoot, "index", pIndex); + } + + { + int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + arr[i] = pSyncIndexMgr->privateTerm[i]; + } + cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); + taosMemoryFree(arr); + cJSON_AddItemToObject(pRoot, "privateTerm", pIndex); + } + + snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot); + return pJson; +} + +char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { + cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index 012382d69d3bcda0a4f6cd2c2daae6f9b7830a99..cdd2ae045d4b60c2480ba16101ad94861015e0a8 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -16,6 +16,244 @@ #define _DEFAULT_SOURCE #include "syncTest.h" +// ---- message process SyncPing---- +SyncPing* syncPingBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SyncPing) + dataLen; + SyncPing* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = TDMT_SYNC_PING; + pMsg->dataLen = dataLen; + return pMsg; +} + +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPing* pMsg = syncPingBuild(dataLen); + pMsg->vgId = vgId; + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) { + SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping"); + return pMsg; +} + +char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncPingSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { + syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + if (tStartDecode(&decoder) < 0) { + return NULL; + } + + SyncPing* pMsg = NULL; + uint32_t bytes; + if (tDecodeU32(&decoder, &bytes) < 0) { + return NULL; + } + + pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + pMsg->bytes = bytes; + + if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + uint32_t len; + char* data = NULL; + if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + ASSERT(len == pMsg->dataLen); + memcpy(pMsg->data, data, len); + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return pMsg; +} + +int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) { + return -1; + } + + if (tEncodeU32(&encoder, pMsg->bytes) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->vgId) < 0) { + return -1; + } + if (tEncodeU32(&encoder, pMsg->msgType) < 0) { + return -1; + } + if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) { + return -1; + } + if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) { + return -1; + } + if (tEncodeU32(&encoder, pMsg->dataLen) < 0) { + return -1; + } + if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) { + return -1; + } + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +cJSON* syncPing2Json(const SyncPing* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + +char* syncPing2Str(const SyncPing* pMsg) { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncPingPrint(const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPingPrint2(char* s, const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPingLog(const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncPingLog2(char* s, const SyncPing* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncPing2Str(pMsg); + sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot;