From d5f7f8c398f03d05c41a86b715d8b01c59afd447 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 13 Nov 2022 16:11:49 +0800 Subject: [PATCH] refact: remove sync apply msg --- source/libs/sync/inc/syncMessage.h | 24 +-- source/libs/sync/src/syncMessage.c | 153 ------------------ .../sync/test/sync_test_lib/inc/syncTest.h | 20 +++ .../test/sync_test_lib/src/syncMessageDebug.c | 153 ++++++++++++++++++ 4 files changed, 175 insertions(+), 175 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index a38b7c10ff..89ed509161 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -172,27 +172,6 @@ typedef struct SyncApplyMsg { char data[]; // user RpcMsg.pCont } SyncApplyMsg; -SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen); -SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta); -void syncApplyMsgDestroy(SyncApplyMsg* pMsg); -void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen); -void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg); -char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len); -SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len); -void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ -void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg -SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg); -void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg -cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg); -char* syncApplyMsg2Str(const SyncApplyMsg* pMsg); - -// for debug ---------------------- -void syncApplyMsgPrint(const SyncApplyMsg* pMsg); -void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg); -void syncApplyMsgLog(const SyncApplyMsg* pMsg); -void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg); - -// --------------------------------------------- typedef struct SyncSnapshotSend { uint32_t bytes; int32_t vgId; @@ -352,7 +331,7 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode); -int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginalRpc, uint64_t seq, bool isWeak, int32_t vgId); +int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seq, bool isWeak, int32_t vgId); int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId); int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); @@ -362,6 +341,7 @@ int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 26bedb2b66..bee20d7c3d 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -223,159 +223,6 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } -// ---- message process SyncApplyMsg---- -SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { - uint32_t bytes = sizeof(SyncApplyMsg) + dataLen; - SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->msgType = TDMT_SYNC_APPLY_MSG; - pMsg->dataLen = dataLen; - return pMsg; -} - -SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta) { - SyncApplyMsg* pMsg = syncApplyMsgBuild(pOriginalRpcMsg->contLen); - pMsg->vgId = vgId; - pMsg->originalRpcType = pOriginalRpcMsg->msgType; - pMsg->fsmMeta = *pMeta; - memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); - return pMsg; -} - -void syncApplyMsgDestroy(SyncApplyMsg* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncApplyMsgSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncApplyMsgDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -// SyncApplyMsg to SRpcMsg, put it into ApplyQ -void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncApplyMsgSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -// get SRpcMsg from ApplyQ, to SyncApplyMsg -void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg) { - syncApplyMsgDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncApplyMsg* pMsg = syncApplyMsgDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - return pMsg; -} - -// SyncApplyMsg to OriginalRpcMsg -void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg) { - memset(pOriginalRpcMsg, 0, sizeof(*pOriginalRpcMsg)); - pOriginalRpcMsg->msgType = pMsg->originalRpcType; - pOriginalRpcMsg->contLen = pMsg->dataLen; - pOriginalRpcMsg->pCont = rpcMallocCont(pOriginalRpcMsg->contLen); - memcpy(pOriginalRpcMsg->pCont, pMsg->data, pOriginalRpcMsg->contLen); -} - -cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); - - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fsmMeta.index); - cJSON_AddStringToObject(pRoot, "fsmMeta.index", u64buf); - cJSON_AddNumberToObject(pRoot, "fsmMeta.isWeak", pMsg->fsmMeta.isWeak); - cJSON_AddNumberToObject(pRoot, "fsmMeta.code", pMsg->fsmMeta.code); - cJSON_AddNumberToObject(pRoot, "fsmMeta.state", pMsg->fsmMeta.state); - cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncStr(pMsg->fsmMeta.state)); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->fsmMeta.seqNum); - cJSON_AddStringToObject(pRoot, "fsmMeta.seqNum", u64buf); - - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - taosMemoryFree(s); - } - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncApplyMsg", pRoot); - return pJson; -} - -char* syncApplyMsg2Str(const SyncApplyMsg* pMsg) { - cJSON* pJson = syncApplyMsg2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncApplyMsgPrint(const SyncApplyMsg* pMsg) { - char* serialized = syncApplyMsg2Str(pMsg); - printf("syncApplyMsgPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg) { - char* serialized = syncApplyMsg2Str(pMsg); - printf("syncApplyMsgPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncApplyMsgLog(const SyncApplyMsg* pMsg) { - char* serialized = syncApplyMsg2Str(pMsg); - sTrace("ssyncApplyMsgLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncApplyMsg2Str(pMsg); - sTrace("syncApplyMsgLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } -} - // --------------------------------------------- SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId) { uint32_t bytes = sizeof(SyncSnapshotSend) + dataLen; diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 0df4c69d64..8a899406f1 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -389,6 +389,26 @@ void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg); int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg); +SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen); +SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta); +void syncApplyMsgDestroy(SyncApplyMsg* pMsg); +void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen); +void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg); +char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len); +SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len); +void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ +void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg +SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg); +void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg +cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg); +char* syncApplyMsg2Str(const SyncApplyMsg* pMsg); + +// for debug ---------------------- +void syncApplyMsgPrint(const SyncApplyMsg* pMsg); +void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg); +void syncApplyMsgLog(const SyncApplyMsg* pMsg); +void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index d1125c66d9..b797470d72 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -2239,3 +2239,156 @@ void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg) { taosMemoryFree(serialized); } } + +// ---- message process SyncApplyMsg---- +SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SyncApplyMsg) + dataLen; + SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = TDMT_SYNC_APPLY_MSG; + pMsg->dataLen = dataLen; + return pMsg; +} + +SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta) { + SyncApplyMsg* pMsg = syncApplyMsgBuild(pOriginalRpcMsg->contLen); + pMsg->vgId = vgId; + pMsg->originalRpcType = pOriginalRpcMsg->msgType; + pMsg->fsmMeta = *pMeta; + memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); + return pMsg; +} + +void syncApplyMsgDestroy(SyncApplyMsg* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncApplyMsgSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncApplyMsgDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +// SyncApplyMsg to SRpcMsg, put it into ApplyQ +void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncApplyMsgSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +// get SRpcMsg from ApplyQ, to SyncApplyMsg +void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg) { + syncApplyMsgDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncApplyMsg* pMsg = syncApplyMsgDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + +// SyncApplyMsg to OriginalRpcMsg +void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg) { + memset(pOriginalRpcMsg, 0, sizeof(*pOriginalRpcMsg)); + pOriginalRpcMsg->msgType = pMsg->originalRpcType; + pOriginalRpcMsg->contLen = pMsg->dataLen; + pOriginalRpcMsg->pCont = rpcMallocCont(pOriginalRpcMsg->contLen); + memcpy(pOriginalRpcMsg->pCont, pMsg->data, pOriginalRpcMsg->contLen); +} + +cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fsmMeta.index); + cJSON_AddStringToObject(pRoot, "fsmMeta.index", u64buf); + cJSON_AddNumberToObject(pRoot, "fsmMeta.isWeak", pMsg->fsmMeta.isWeak); + cJSON_AddNumberToObject(pRoot, "fsmMeta.code", pMsg->fsmMeta.code); + cJSON_AddNumberToObject(pRoot, "fsmMeta.state", pMsg->fsmMeta.state); + cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncStr(pMsg->fsmMeta.state)); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->fsmMeta.seqNum); + cJSON_AddStringToObject(pRoot, "fsmMeta.seqNum", u64buf); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncApplyMsg", pRoot); + return pJson; +} + +char* syncApplyMsg2Str(const SyncApplyMsg* pMsg) { + cJSON* pJson = syncApplyMsg2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncApplyMsgPrint(const SyncApplyMsg* pMsg) { + char* serialized = syncApplyMsg2Str(pMsg); + printf("syncApplyMsgPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg) { + char* serialized = syncApplyMsg2Str(pMsg); + printf("syncApplyMsgPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncApplyMsgLog(const SyncApplyMsg* pMsg) { + char* serialized = syncApplyMsg2Str(pMsg); + sTrace("ssyncApplyMsgLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncApplyMsg2Str(pMsg); + sTrace("syncApplyMsgLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} -- GitLab