From cd19ff0173547c577651e66b2aed4ad406761df3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 12 Nov 2022 13:17:56 +0800 Subject: [PATCH] enh: adjust request vote reply --- source/libs/sync/inc/syncInt.h | 5 - source/libs/sync/inc/syncMessage.h | 24 +-- source/libs/sync/inc/syncRequestVoteReply.h | 2 +- source/libs/sync/src/syncMain.c | 43 +---- source/libs/sync/src/syncMessage.c | 155 ++---------------- source/libs/sync/src/syncRequestVote.c | 27 ++- source/libs/sync/src/syncRequestVoteReply.c | 15 +- .../sync/test/sync_test_lib/inc/syncTest.h | 18 ++ .../test/sync_test_lib/src/syncMessageDebug.c | 148 +++++++++++++++++ 9 files changed, 225 insertions(+), 212 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 71b2de6a00..369aee8116 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -324,11 +324,6 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); -void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); - -void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); -void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); - void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index d5ac5e2d17..663427cf20 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -80,27 +80,6 @@ typedef struct SyncRequestVoteReply { bool voteGranted; } SyncRequestVoteReply; -SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId); -void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg); -void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen); -void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg); -char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len); -SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len); -void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg); -void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg); -SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg); -char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg); - -// for debug ---------------------- -void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg); -void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg); -void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg); -void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg); - -// --------------------------------------------- -// data: entry - typedef struct SyncAppendEntries { uint32_t bytes; int32_t vgId; @@ -478,7 +457,7 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); // on message ---------------------- int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); +int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); @@ -509,6 +488,7 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR bool isWeak, int32_t vgId); int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index ea43f0301d..5d46365280 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -39,7 +39,7 @@ extern "C" { // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); +int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 51d80cc395..6e8a927e3f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -148,9 +148,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { syncNodeOnRequestVote(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); - code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); + code = syncNodeOnRequestVoteReply(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); @@ -376,7 +374,7 @@ bool syncIsReadyForRead(int64_t rid) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); if (code == 0 && pEntry != NULL) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { ready = true; @@ -1652,9 +1650,13 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) // simulate get vote from outside void syncNodeVoteForSelf(SSyncNode* pSyncNode) { - syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); + syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId); - SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; + int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId); + if (ret == 0) return; + + SyncRequestVoteReply* pMsg = rpcMsg.pCont; pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->myRaftId; pMsg->term = pSyncNode->pRaftStore->currentTerm; @@ -1662,11 +1664,9 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { voteGrantedVote(pSyncNode->pVotesGranted, pMsg); votesRespondAdd(pSyncNode->pVotesRespond, pMsg); - syncRequestVoteReplyDestroy(pMsg); + rpcFreeCont(rpcMsg.pCont); } -// snapshot -------------- - // return if has a snapshot bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool ret = false; @@ -2533,31 +2533,6 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); } -void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { - char logBuf[256]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); -} - -void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, - pMsg->voteGranted, s); -} - -void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, - pMsg->voteGranted, s); -} - void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { char host[64]; uint16_t port; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 94c1e36644..7e1883d530 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -105,152 +105,21 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) { return 0; } -// ---- message process SyncRequestVoteReply---- -SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) { - uint32_t bytes = sizeof(SyncRequestVoteReply); - SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) { + int32_t bytes = sizeof(SyncRequestVoteReply); + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; - return pMsg; -} - -void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncRequestVoteReplyDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncRequestVoteReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg) { - syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted); + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TDMT_SYNC_REQUEST_VOTE; + return -1; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot); - return pJson; -} - -char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) { - cJSON* pJson = syncRequestVoteReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg) { - char* serialized = syncRequestVoteReply2Str(pMsg); - printf("syncRequestVoteReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg) { - char* serialized = syncRequestVoteReply2Str(pMsg); - printf("syncRequestVoteReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) { - char* serialized = syncRequestVoteReply2Str(pMsg); - sTrace("syncRequestVoteReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncRequestVoteReply2Str(pMsg); - sTrace("syncRequestVoteReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } + SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont; + pRequestVoteReply->bytes = bytes; + pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; + pRequestVoteReply->vgId = vgId; + return 0; } // ---- message process SyncAppendEntries---- diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 69c070f70d..9085572ebe 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -88,6 +88,23 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM return false; } +static void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { + char logBuf[256]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); +} + +static void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, + pMsg->voteGranted, s); +} + int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncRequestVote* pMsg = pRpcMsg->pCont; @@ -122,7 +139,11 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } // send msg - SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); + SRpcMsg rpcMsg = {0}; + ret = syncBuildRequestVoteReply(&rpcMsg, ths->vgId); + ASSERT(ret); + + SyncRequestVoteReply* pReply = rpcMsg.pCont; pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; pReply->term = ths->pRaftStore->currentTerm; @@ -136,10 +157,6 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogSendRequestVoteReply(ths, pReply, ""); } while (0); - SRpcMsg rpcMsg; - syncRequestVoteReply2RpcMsg(pReply, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncRequestVoteReplyDestroy(pReply); - return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 1acf16507a..a9d363630e 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -17,6 +17,7 @@ #include "syncRequestVoteReply.h" #include "syncMessage.h" #include "syncRaftStore.h" +#include "syncUtil.h" #include "syncVoteMgr.h" // TLA+ Spec @@ -36,8 +37,18 @@ // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = 0; + +static void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, + pMsg->voteGranted, s); +} + +int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + int32_t ret = 0; + SyncRequestVoteReply* pMsg = pRpcMsg->pCont; // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index e6df5bdc5e..b930367870 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -259,6 +259,24 @@ void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg); void syncRequestVoteLog(const SyncRequestVote* pMsg); void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg); +SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId); +void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg); +void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen); +void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg); +char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len); +SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len); +void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg); +void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg); +SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg); +char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg); + +// for debug ---------------------- +void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index 3c408f8eb5..ad8a019f0f 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -1159,4 +1159,152 @@ void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) { sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } +} + +// ---- message process SyncRequestVoteReply---- +SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncRequestVoteReply); + SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; + return pMsg; +} + +void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncRequestVoteReplyDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncRequestVoteReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg) { + syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot); + return pJson; +} + +char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) { + cJSON* pJson = syncRequestVoteReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + printf("syncRequestVoteReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + printf("syncRequestVoteReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + sTrace("syncRequestVoteReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncRequestVoteReply2Str(pMsg); + sTrace("syncRequestVoteReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } } \ No newline at end of file -- GitLab