From 8cca68f29d7bc04832cb174f292559a970bc37a6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 12 Nov 2022 12:58:08 +0800 Subject: [PATCH] enh: adjust request vote msg --- source/libs/sync/inc/syncElection.h | 4 +- source/libs/sync/inc/syncInt.h | 1 - source/libs/sync/inc/syncMessage.h | 40 ++--- source/libs/sync/inc/syncRequestVote.h | 2 +- source/libs/sync/src/syncElection.c | 68 ++++---- source/libs/sync/src/syncMain.c | 12 +- source/libs/sync/src/syncMessage.c | 158 ++---------------- source/libs/sync/src/syncRequestVote.c | 5 +- .../sync/test/sync_test_lib/inc/syncTest.h | 18 ++ .../test/sync_test_lib/src/syncMessageDebug.c | 151 +++++++++++++++++ 10 files changed, 232 insertions(+), 227 deletions(-) diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 662c17094e..53337e1a8a 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -34,9 +34,7 @@ extern "C" { // mdest |-> j]) // /\ UNCHANGED <> -int32_t syncNodeElect(SSyncNode* pSyncNode); -int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); -int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); +int32_t syncNodeElect(SSyncNode* pNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0d9bf99fd1..71b2de6a00 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -324,7 +324,6 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); -void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 589588924b..d5ac5e2d17 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -28,8 +28,6 @@ typedef enum ESyncTimeoutType { SYNC_TIMEOUT_HEARTBEAT, } ESyncTimeoutType; -const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); - typedef struct SyncTimeout { uint32_t bytes; int32_t vgId; @@ -40,9 +38,6 @@ typedef struct SyncTimeout { void* data; // need optimized } SyncTimeout; -int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, - SSyncNode* pNode); - typedef struct SyncClientRequest { uint32_t bytes; int32_t vgId; @@ -54,10 +49,6 @@ typedef struct SyncClientRequest { char data[]; // origin RpcMsg.pCont } SyncClientRequest; -int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, - bool isWeak, int32_t vgId); -int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); - typedef struct SyncClientRequestReply { uint32_t bytes; int32_t vgId; @@ -78,25 +69,6 @@ typedef struct SyncRequestVote { SyncTerm lastLogTerm; } SyncRequestVote; -SyncRequestVote* syncRequestVoteBuild(int32_t vgId); -void syncRequestVoteDestroy(SyncRequestVote* pMsg); -void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen); -void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg); -char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len); -SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len); -void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg); -void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg); -SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg); -char* syncRequestVote2Str(const SyncRequestVote* pMsg); - -// for debug ---------------------- -void syncRequestVotePrint(const SyncRequestVote* pMsg); -void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg); -void syncRequestVoteLog(const SyncRequestVote* pMsg); -void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg); - -// --------------------------------------------- typedef struct SyncRequestVoteReply { uint32_t bytes; int32_t vgId; @@ -505,8 +477,7 @@ void syncLocalCmdLog(const SyncLocalCmd* pMsg); void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); // on message ---------------------- - -int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); @@ -530,6 +501,15 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); +const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); + +int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, + SSyncNode* pNode); +int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, + bool isWeak, int32_t vgId); +int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); +int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index ae8e3ff1cf..7e7c5f7e16 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -44,7 +44,7 @@ extern "C" { // m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 123ce5b581..9f0839fc02 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -33,6 +33,41 @@ // mdest |-> j]) // /\ UNCHANGED <> +static void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host, + port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); +} + +static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { + if (pNode->state != TAOS_SYNC_STATE_CANDIDATE) { + sNTrace(pNode, "not candidate, stop elect"); + return 0; + } + + int32_t ret = 0; + for (int i = 0; i < pNode->peersNum; ++i) { + SRpcMsg rpcMsg = {0}; + ret = syncBuildRequestVote(&rpcMsg, pNode->vgId); + ASSERT(ret == 0); + + SyncRequestVote* pMsg = rpcMsg.pCont; + pMsg->srcId = pNode->myRaftId; + pMsg->destId = pNode->peersId[i]; + pMsg->term = pNode->pRaftStore->currentTerm; + + ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm); + ASSERT(ret == 0); + + ret = syncNodeSendMsgById(&pNode->peersId[i], pNode, &rpcMsg); + ASSERT(ret == 0); + } + + return ret; +} + int32_t syncNodeElect(SSyncNode* pSyncNode) { sNTrace(pSyncNode, "begin election"); @@ -81,36 +116,3 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { return ret; } - -int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { - if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) { - sNTrace(pSyncNode, "not candidate, stop elect"); - return 0; - } - - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = pSyncNode->peersId[i]; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - - ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm)); - ASSERT(ret == 0); - - ret = syncNodeSendRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); - ASSERT(ret == 0); - syncRequestVoteDestroy(pMsg); - } - return ret; -} - -int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { - int32_t ret = 0; - syncLogSendRequestVote(pSyncNode, pMsg, ""); - - SRpcMsg rpcMsg; - syncRequestVote2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); - return ret; -} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6fca1c634d..51d80cc395 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -146,9 +146,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote* pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); - code = syncNodeOnRequestVote(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); + syncNodeOnRequestVote(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg); @@ -2535,14 +2533,6 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); } -void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); -} - void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { char logBuf[256]; char host[64]; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 02cf054d9a..94c1e36644 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -88,155 +88,21 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const return 0; } -// ---- message process SyncRequestVote---- -SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { - uint32_t bytes = sizeof(SyncRequestVote); - SyncRequestVote* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) { + int32_t bytes = sizeof(SyncRequestVote); + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; - return pMsg; -} - -void syncRequestVoteDestroy(SyncRequestVote* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncRequestVoteSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncRequestVote* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncRequestVoteDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncRequestVoteSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) { - syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastLogIndex); - cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastLogTerm); - cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TDMT_SYNC_REQUEST_VOTE; + return -1; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot); - return pJson; -} - -char* syncRequestVote2Str(const SyncRequestVote* pMsg) { - cJSON* pJson = syncRequestVote2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncRequestVotePrint(const SyncRequestVote* pMsg) { - char* serialized = syncRequestVote2Str(pMsg); - printf("syncRequestVotePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg) { - char* serialized = syncRequestVote2Str(pMsg); - printf("syncRequestVotePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncRequestVoteLog(const SyncRequestVote* pMsg) { - char* serialized = syncRequestVote2Str(pMsg); - sTrace("syncRequestVoteLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncRequestVote2Str(pMsg); - sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } + SyncRequestVote* pRequestVote = pMsg->pCont; + pRequestVote->bytes = bytes; + pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE; + pRequestVote->vgId = vgId; + return 0; } // ---- message process SyncRequestVoteReply---- diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index bf44341acd..69c070f70d 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -88,8 +88,9 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM return false; } -int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = 0; +int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + int32_t ret = 0; + SyncRequestVote* pMsg = pRpcMsg->pCont; // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 8b0707420e..e6df5bdc5e 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -241,6 +241,24 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMs void syncClientRequestLog(const SyncClientRequest* pMsg); void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); +SyncRequestVote* syncRequestVoteBuild(int32_t vgId); +void syncRequestVoteDestroy(SyncRequestVote* pMsg); +void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen); +void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg); +char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len); +SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len); +void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg); +void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg); +SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg); +char* syncRequestVote2Str(const SyncRequestVote* pMsg); + +// for debug ---------------------- +void syncRequestVotePrint(const SyncRequestVote* pMsg); +void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg); +void syncRequestVoteLog(const SyncRequestVote* pMsg); +void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index 2c662fa02e..3c408f8eb5 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -1009,3 +1009,154 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) { taosMemoryFree(serialized); } } + +// ---- message process SyncRequestVote---- +SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncRequestVote); + SyncRequestVote* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; + return pMsg; +} + +void syncRequestVoteDestroy(SyncRequestVote* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncRequestVoteSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncRequestVote* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncRequestVoteDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncRequestVoteSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) { + syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastLogIndex); + cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastLogTerm); + cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot); + return pJson; +} + +char* syncRequestVote2Str(const SyncRequestVote* pMsg) { + cJSON* pJson = syncRequestVote2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncRequestVotePrint(const SyncRequestVote* pMsg) { + char* serialized = syncRequestVote2Str(pMsg); + printf("syncRequestVotePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg) { + char* serialized = syncRequestVote2Str(pMsg); + printf("syncRequestVotePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncRequestVoteLog(const SyncRequestVote* pMsg) { + char* serialized = syncRequestVote2Str(pMsg); + sTrace("syncRequestVoteLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncRequestVote2Str(pMsg); + sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} \ No newline at end of file -- GitLab