From 2dfc9705729c982b8af332a7d55ed21badf4dd88 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 12 Nov 2022 16:40:09 +0800 Subject: [PATCH] enh: adjust sync append entry msg --- source/libs/sync/inc/syncAppendEntries.h | 2 +- source/libs/sync/inc/syncInt.h | 3 - source/libs/sync/inc/syncMessage.h | 24 +-- source/libs/sync/inc/syncReplication.h | 3 - source/libs/sync/src/syncAppendEntries.c | 17 +- source/libs/sync/src/syncMain.c | 27 +-- source/libs/sync/src/syncMessage.c | 180 ++---------------- source/libs/sync/src/syncReplication.c | 74 +++---- .../sync/test/sync_test_lib/inc/syncTest.h | 18 ++ .../test/sync_test_lib/src/syncMessageDebug.c | 173 ++++++++++++++++- 10 files changed, 261 insertions(+), 260 deletions(-) diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index a87a28baf5..7791160afa 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -88,7 +88,7 @@ extern "C" { // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 369aee8116..8232b35284 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -324,9 +324,6 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); -void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); -void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); - void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 25bf097faf..419f9591b3 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -97,24 +97,6 @@ typedef struct SyncAppendEntries { char data[]; } SyncAppendEntries; -SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId); -void syncAppendEntriesDestroy(SyncAppendEntries* pMsg); -void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen); -void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg); -char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len); -SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len); -void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg); -void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg); -SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg); -char* syncAppendEntries2Str(const SyncAppendEntries* pMsg); - -// for debug ---------------------- -void syncAppendEntriesPrint(const SyncAppendEntries* pMsg); -void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg); -void syncAppendEntriesLog(const SyncAppendEntries* pMsg); -void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); - typedef struct SyncAppendEntriesReply { uint32_t bytes; int32_t vgId; @@ -446,9 +428,8 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); // on message ---------------------- int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pMsg); - -int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); +int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); +int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); @@ -478,6 +459,7 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index e08430327b..c214f7d1de 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -53,9 +53,6 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, cons int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId); -int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg); -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index c9c1baa4bc..91c8d183e3 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -18,6 +18,7 @@ #include "syncMessage.h" #include "syncRaftLog.h" #include "syncRaftStore.h" +#include "syncUtil.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == @@ -123,7 +124,21 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { return 0; } -int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { +void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen, s); +} + +int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + SyncAppendEntries* pMsg = pRpcMsg->pCont; + // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { syncLogRecvAppendEntries(ths, pMsg, "not in my config"); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 636bf9410e..67b1fed491 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -150,9 +150,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { code = syncNodeOnRequestVoteReply(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); - code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); + syncNodeOnAppendEntries(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); @@ -2533,29 +2531,6 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); } -void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, - pMsg->dataLen, s); -} - -void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen, s); -} - void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { char host[64]; uint16_t port; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 7e1883d530..826549998e 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -63,7 +63,6 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR pClientRequest->dataLen = pOriginalRpcMsg->contLen; memcpy(pClientRequest->data, (char*)pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); - return 0; } @@ -122,175 +121,22 @@ int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } -// ---- message process SyncAppendEntries---- -SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId) { - uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; - SyncAppendEntries* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { + int32_t bytes = sizeof(SyncAppendEntries) + dataLen; + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; - pMsg->dataLen = dataLen; - return pMsg; -} - -void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); - ASSERT(pMsg->bytes == sizeof(SyncAppendEntries) + pMsg->dataLen); -} - -char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncAppendEntriesSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncAppendEntries* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncAppendEntriesDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncAppendEntriesSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg) { - syncAppendEntriesDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncAppendEntries* pMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); - cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->prevLogIndex); - cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->prevLogTerm); - cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex); - cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); - - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - taosMemoryFree(s); + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot); - return pJson; -} - -char* syncAppendEntries2Str(const SyncAppendEntries* pMsg) { - cJSON* pJson = syncAppendEntries2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncAppendEntriesPrint(const SyncAppendEntries* pMsg) { - char* serialized = syncAppendEntries2Str(pMsg); - printf("syncAppendEntriesPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg) { - char* serialized = syncAppendEntries2Str(pMsg); - printf("syncAppendEntriesPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncAppendEntriesLog(const SyncAppendEntries* pMsg) { - char* serialized = syncAppendEntries2Str(pMsg); - sTrace("syncAppendEntriesLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncAppendEntries2Str(pMsg); - sTrace("syncAppendEntriesLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } + SyncAppendEntries* pAppendEntries = pMsg->pCont; + pAppendEntries->bytes = bytes; + pAppendEntries->vgId = vgId; + pAppendEntries->msgType = TDMT_SYNC_APPEND_ENTRIES; + pAppendEntries->dataLen = dataLen; + return 0; } // ---- message process SyncAppendEntriesReply---- diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 2a3705e139..c63378a465 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -20,6 +20,9 @@ #include "syncRaftStore.h" #include "syncUtil.h" +static int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); +static int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); + // TLA+ Spec // AppendEntries(i, j) == // /\ i /= j @@ -65,6 +68,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); // prepare entry + SRpcMsg rpcMsg = {0}; SyncAppendEntries* pMsg = NULL; SSyncRaftEntry* pEntry; @@ -73,26 +77,23 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { if (code == 0) { ASSERT(pEntry != NULL); - pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); - ASSERT(pMsg != NULL); - memcpy(pMsg->data, pEntry, pEntry->bytes); - syncEntryDestory(pEntry); + code = syncBuildAppendEntries(&rpcMsg, pEntry->bytes, pSyncNode->vgId); + ASSERT(code == 0); + pMsg = rpcMsg.pCont; + memcpy(pMsg->data, pEntry, pEntry->bytes); } else { if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { // no entry in log - pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); - ASSERT(pMsg != NULL); + code = syncBuildAppendEntries(&rpcMsg, 0, pSyncNode->vgId); + ASSERT(code == 0); + pMsg = rpcMsg.pCont; } else { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); - sNError(pSyncNode, "replicate to %s:%d error, next-index:%" PRId64, host, port, nextIndex); - } while (0); - - syncAppendEntriesDestroy(pMsg); + char host[64]; + uint16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + sNError(pSyncNode, "replicate to %s:%d error, next-index:%" PRId64, host, port, nextIndex); return -1; } } @@ -109,9 +110,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { // pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId); // send msg - syncNodeMaybeSendAppendEntries(pSyncNode, pDestId, pMsg); - syncAppendEntriesDestroy(pMsg); - + syncNodeMaybeSendAppendEntries(pSyncNode, pDestId, &rpcMsg); return 0; } @@ -137,13 +136,23 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { - int32_t ret = 0; - syncLogSendAppendEntries(pSyncNode, pMsg, ""); +static void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, + "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen, s); +} - SRpcMsg rpcMsg; - syncAppendEntries2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); +int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { + int32_t ret = 0; + SyncAppendEntries* pMsg = pRpcMsg->pCont; + + syncLogSendAppendEntries(pSyncNode, pMsg, ""); + syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId); if (pState == NULL) { @@ -159,11 +168,12 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI return ret; } -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { - int32_t ret = 0; - if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) { - ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg); +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { + int32_t ret = 0; + SyncAppendEntries* pMsg = pRpcMsg->pCont; + if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) { + ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg); } else { char logBuf[128]; char host[64]; @@ -175,16 +185,6 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest return ret; } -int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { - int32_t ret = 0; - syncLogSendAppendEntries(pSyncNode, pMsg, ""); - - SRpcMsg rpcMsg; - syncAppendEntries2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); - return ret; -} - int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) { int32_t ret = 0; syncLogSendHeartbeat(pSyncNode, pMsg, ""); diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index b930367870..ef3e4c3af9 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -277,6 +277,24 @@ void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg); void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg); void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg); +SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId); +void syncAppendEntriesDestroy(SyncAppendEntries* pMsg); +void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg); +char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len); +SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len); +void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg); +SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg); +char* syncAppendEntries2Str(const SyncAppendEntries* pMsg); + +// for debug ---------------------- +void syncAppendEntriesPrint(const SyncAppendEntries* pMsg); +void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg); +void syncAppendEntriesLog(const SyncAppendEntries* pMsg); +void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index ad8a019f0f..a5a30f0273 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -1307,4 +1307,175 @@ void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) { sTrace("syncRequestVoteReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); taosMemoryFree(serialized); } -} \ No newline at end of file +} + +// ---- message process SyncAppendEntries---- +SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId) { + uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; + SyncAppendEntries* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; + pMsg->dataLen = dataLen; + return pMsg; +} + +void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncAppendEntries) + pMsg->dataLen); +} + +char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncAppendEntriesSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncAppendEntries* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncAppendEntriesDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncAppendEntriesSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg) { + syncAppendEntriesDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncAppendEntries* pMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); + cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->prevLogIndex); + cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->prevLogTerm); + cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot); + return pJson; +} + +char* syncAppendEntries2Str(const SyncAppendEntries* pMsg) { + cJSON* pJson = syncAppendEntries2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncAppendEntriesPrint(const SyncAppendEntries* pMsg) { + char* serialized = syncAppendEntries2Str(pMsg); + printf("syncAppendEntriesPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg) { + char* serialized = syncAppendEntries2Str(pMsg); + printf("syncAppendEntriesPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncAppendEntriesLog(const SyncAppendEntries* pMsg) { + char* serialized = syncAppendEntries2Str(pMsg); + sTrace("syncAppendEntriesLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncAppendEntries2Str(pMsg); + sTrace("syncAppendEntriesLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} -- GitLab