From 6035f0305d39911dbff10c35fd32a0aa57068d14 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Apr 2022 11:51:00 +0800 Subject: [PATCH] sync integration add SyncApplyMsg --- include/libs/sync/syncTools.h | 33 +++++++++++++++++++++++++++++- source/libs/sync/src/syncMessage.c | 28 +++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 567889191a..327450c10a 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -363,6 +363,37 @@ void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg); void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg); void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg); +// --------------------------------------------- +typedef struct SyncApplyMsg { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; // user SyncApplyMsg msgType + uint32_t originalRpcType; // user RpcMsg msgType + SFsmCbMeta fsmMeta; + uint32_t dataLen; // user RpcMsg.contLen + char data[]; // user RpcMsg.pCont +} SyncApplyMsg; + +SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen); +SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta); +void syncApplyMsgDestroy(SyncApplyMsg* pMsg); +void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen); +void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg); +char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len); +SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len); +void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ +void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg +void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg +SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg); +char* syncApplyMsg2Str(const SyncApplyMsg* pMsg); + +// for debug ---------------------- +void syncApplyMsgPrint(const SyncApplyMsg* pMsg); +void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg); +void ssyncApplyMsgLog(const SyncApplyMsg* pMsg); +void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg); + // on message ---------------------- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); @@ -373,7 +404,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); -//--------------------- +// --------------------------------------------- #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 76c68569c3..1299b37769 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1531,3 +1531,31 @@ void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) { sTrace("syncAppendEntriesReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); } + +// ---- message process SyncApplyMsg---- +SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { return NULL; } +SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta) { return NULL; } +void syncApplyMsgDestroy(SyncApplyMsg* pMsg) {} +void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen) {} +void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg) {} +char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) { return NULL; } +SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len) { return NULL; } + +// SyncApplyMsg to SRpcMsg, put it into ApplyQ +void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg) {} + +// get SRpcMsg from ApplyQ, to SyncApplyMsg +void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg) {} + +// SyncApplyMsg to OriginalRpcMsg +void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg) {} + +SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg) { return NULL; } +cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) { return NULL; } +char* syncApplyMsg2Str(const SyncApplyMsg* pMsg) { return NULL; } + +// for debug ---------------------- +void syncApplyMsgPrint(const SyncApplyMsg* pMsg) {} +void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg) {} +void ssyncApplyMsgLog(const SyncApplyMsg* pMsg) {} +void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) {} -- GitLab