From 26cb3c3856bc06fdc6eb5887890a3eb4d970cbc5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 12 Nov 2022 12:26:56 +0800 Subject: [PATCH] refact: remove sync batch codes --- source/libs/sync/inc/syncInt.h | 3 - source/libs/sync/inc/syncMessage.h | 68 --- source/libs/sync/src/syncMain.c | 24 - source/libs/sync/src/syncMessage.c | 197 -------- .../sync/test/syncAppendEntriesBatchTest.cpp | 2 +- .../sync/test/sync_test_lib/inc/syncBatch.h | 98 ++++ .../sync/test/sync_test_lib/inc/syncTest.h | 2 +- .../sync/test/sync_test_lib/src/syncBatch.c | 451 ++++++++++++++++++ .../test/sync_test_lib/src/syncMessageDebug.c | 212 -------- 9 files changed, 551 insertions(+), 506 deletions(-) create mode 100644 source/libs/sync/test/sync_test_lib/inc/syncBatch.h create mode 100644 source/libs/sync/test/sync_test_lib/src/syncBatch.c diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d9eefab33f..0d9bf99fd1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -333,9 +333,6 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); -void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s); -void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s); - void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index db3b01a053..589588924b 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -58,43 +58,6 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR bool isWeak, int32_t vgId); int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); -// --------------------------------------------- -typedef struct SRaftMeta { - uint64_t seqNum; - bool isWeak; -} SRaftMeta; - -// block1: -// block2: SRaftMeta array -// block3: rpc msg array (with pCont pointer) - -typedef struct SyncClientRequestBatch { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH - uint32_t dataCount; - uint32_t dataLen; - char data[]; // block2, block3 -} SyncClientRequestBatch; - -SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, - int32_t vgId); -void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); -void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg); -void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg); -SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg); -SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg); -SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg); -cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg); -char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg); - -// for debug ---------------------- -void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg); -void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg); -void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg); -void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg); - -// --------------------------------------------- typedef struct SyncClientRequestReply { uint32_t bytes; int32_t vgId; @@ -103,7 +66,6 @@ typedef struct SyncClientRequestReply { SRaftId leaderHint; } SyncClientRequestReply; -// --------------------------------------------- typedef struct SyncRequestVote { uint32_t bytes; int32_t vgId; @@ -213,36 +175,6 @@ typedef struct SOffsetAndContLen { // block1: SOffsetAndContLen Array // block2: entry Array -typedef struct SyncAppendEntriesBatch { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - - // private data - SyncTerm term; - SyncIndex prevLogIndex; - SyncTerm prevLogTerm; - SyncIndex commitIndex; - SyncTerm privateTerm; - int32_t dataCount; - uint32_t dataLen; - char data[]; // block1, block2 -} SyncAppendEntriesBatch; - -SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId); -SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg); -void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg); -void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen); -void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg); -char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len); -SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len); -void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg); -void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg); -SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg); - -// --------------------------------------------- typedef struct SyncAppendEntriesReply { uint32_t bytes; int32_t vgId; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1376143a88..6fca1c634d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2591,30 +2591,6 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs pMsg->dataLen, s); } -void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-append-entries-batch to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, - pMsg->dataLen, pMsg->dataCount, s); -} - -void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-append-entries-batch from %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, - pMsg->dataLen, pMsg->dataCount, s); -} - void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { char host[64]; uint16_t port; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index c5d693853b..02cf054d9a 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -88,96 +88,6 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const return 0; } -// ---- message process SyncClientRequestBatch---- - -// block1: -// block2: SRaftMeta array -// block3: rpc msg array (with pCont) - -SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, - int32_t vgId) { - ASSERT(rpcMsgPArr != NULL); - ASSERT(arrSize > 0); - - int32_t dataLen = 0; - int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; - int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; - dataLen += (raftMetaArrayLen + rpcArrayLen); - - uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen; - SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; - pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST_BATCH; - pMsg->dataCount = arrSize; - pMsg->dataLen = dataLen; - - SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data); - SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); - - for (int i = 0; i < arrSize; ++i) { - // init raftMetaArr - raftMetaArr[i].isWeak = raftArr[i].isWeak; - raftMetaArr[i].seqNum = raftArr[i].seqNum; - - // init msgArr - msgArr[i] = *(rpcMsgPArr[i]); - } - - return pMsg; -} - -void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pSyncMsg->msgType; - pRpcMsg->contLen = pSyncMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - memcpy(pRpcMsg->pCont, pSyncMsg, pRpcMsg->contLen); -} - -void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg) { - if (pMsg != NULL) { - int32_t arrSize = pMsg->dataCount; - int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; - SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); - for (int i = 0; i < arrSize; ++i) { - if (msgArr[i].pCont != NULL) { - rpcFreeCont(msgArr[i].pCont); - } - } - - taosMemoryFree(pMsg); - } -} - -SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg) { - SRaftMeta* raftMetaArr = (SRaftMeta*)(pSyncMsg->data); - return raftMetaArr; -} - -SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg) { - int32_t arrSize = pSyncMsg->dataCount; - int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; - SRpcMsg* msgArr = (SRpcMsg*)((char*)(pSyncMsg->data) + raftMetaArrayLen); - return msgArr; -} - -SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg) { - SyncClientRequestBatch* pSyncMsg = taosMemoryMalloc(pRpcMsg->contLen); - ASSERT(pSyncMsg != NULL); - memcpy(pSyncMsg, pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pRpcMsg->contLen == pSyncMsg->bytes); - - return pSyncMsg; -} - // ---- message process SyncRequestVote---- SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncRequestVote); @@ -648,113 +558,6 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { } } -// ---- message process SyncAppendEntriesBatch---- - -// block1: SOffsetAndContLen -// block2: SOffsetAndContLen Array -// block3: entry Array - -SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) { - ASSERT(entryPArr != NULL); - ASSERT(arrSize >= 0); - - int32_t dataLen = 0; - int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // - int32_t entryArrayLen = 0; - for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont - SSyncRaftEntry* pEntry = entryPArr[i]; - entryArrayLen += pEntry->bytes; - } - dataLen += (metaArrayLen + entryArrayLen); - - uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen; - SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; - pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_BATCH; - pMsg->dataCount = arrSize; - pMsg->dataLen = dataLen; - - SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); - char* pData = pMsg->data; - - for (int i = 0; i < arrSize; ++i) { - // init meta - if (i == 0) { - metaArr[i].offset = metaArrayLen; - metaArr[i].contLen = entryPArr[i]->bytes; - } else { - metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen; - metaArr[i].contLen = entryPArr[i]->bytes; - } - - // init entry array - ASSERT(metaArr[i].contLen == entryPArr[i]->bytes); - memcpy(pData + metaArr[i].offset, entryPArr[i], metaArr[i].contLen); - } - - return pMsg; -} - -SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg) { - return (SOffsetAndContLen*)(pMsg->data); -} - -void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); - ASSERT(pMsg->bytes == sizeof(SyncAppendEntriesBatch) + pMsg->dataLen); -} - -char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncAppendEntriesBatchSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncAppendEntriesBatchDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncAppendEntriesBatchSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg) { - syncAppendEntriesBatchDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - // ---- message process SyncAppendEntriesReply---- SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncAppendEntriesReply); diff --git a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp index 947a2cb05c..964d991bb0 100644 --- a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp @@ -1,5 +1,5 @@ -//#include #include "syncTest.h" +#include "syncBatch.h" void logTest() { sTrace("--- sync log test: trace"); diff --git a/source/libs/sync/test/sync_test_lib/inc/syncBatch.h b/source/libs/sync/test/sync_test_lib/inc/syncBatch.h new file mode 100644 index 0000000000..8d8859fc1f --- /dev/null +++ b/source/libs/sync/test/sync_test_lib/inc/syncBatch.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_BATCH_H +#define _TD_LIBS_SYNC_BATCH_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "syncInt.h" + +// --------------------------------------------- +typedef struct SRaftMeta { + uint64_t seqNum; + bool isWeak; +} SRaftMeta; + +// block1: +// block2: SRaftMeta array +// block3: rpc msg array (with pCont pointer) + +typedef struct SyncClientRequestBatch { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH + uint32_t dataCount; + uint32_t dataLen; + char data[]; // block2, block3 +} SyncClientRequestBatch; + +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, + int32_t vgId); +void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); +void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg); +void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg); +SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg); +SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg); +SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg); +cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg); +char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg); + +// for debug ---------------------- +void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg); + +typedef struct SyncAppendEntriesBatch { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncIndex prevLogIndex; + SyncTerm prevLogTerm; + SyncIndex commitIndex; + SyncTerm privateTerm; + int32_t dataCount; + uint32_t dataLen; + char data[]; // block1, block2 +} SyncAppendEntriesBatch; + +SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId); +SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg); +void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg); +char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len); +SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len); +void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg); +SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg); + +// --------------------------------------------- +void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s); +void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_INT_H*/ diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index fef534e0a1..8b0707420e 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -245,4 +245,4 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) } #endif -#endif /*_TD_LIBS_SYNC_RAFT_ENTRY_H*/ +#endif /*_TD_LIBS_SYNC_TEST_H*/ diff --git a/source/libs/sync/test/sync_test_lib/src/syncBatch.c b/source/libs/sync/test/sync_test_lib/src/syncBatch.c new file mode 100644 index 0000000000..b2db1e084a --- /dev/null +++ b/source/libs/sync/test/sync_test_lib/src/syncBatch.c @@ -0,0 +1,451 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "syncTest.h" +#include "syncBatch.h" + +// ---- message process SyncClientRequestBatch---- + +// block1: +// block2: SRaftMeta array +// block3: rpc msg array (with pCont) + +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, + int32_t vgId) { + ASSERT(rpcMsgPArr != NULL); + ASSERT(arrSize > 0); + + int32_t dataLen = 0; + int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; + int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; + dataLen += (raftMetaArrayLen + rpcArrayLen); + + uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen; + SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST_BATCH; + pMsg->dataCount = arrSize; + pMsg->dataLen = dataLen; + + SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data); + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); + + for (int i = 0; i < arrSize; ++i) { + // init raftMetaArr + raftMetaArr[i].isWeak = raftArr[i].isWeak; + raftMetaArr[i].seqNum = raftArr[i].seqNum; + + // init msgArr + msgArr[i] = *(rpcMsgPArr[i]); + } + + return pMsg; +} + +void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pSyncMsg->msgType; + pRpcMsg->contLen = pSyncMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + memcpy(pRpcMsg->pCont, pSyncMsg, pRpcMsg->contLen); +} + +void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg) { + if (pMsg != NULL) { + int32_t arrSize = pMsg->dataCount; + int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); + for (int i = 0; i < arrSize; ++i) { + if (msgArr[i].pCont != NULL) { + rpcFreeCont(msgArr[i].pCont); + } + } + + taosMemoryFree(pMsg); + } +} + +SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg) { + SRaftMeta* raftMetaArr = (SRaftMeta*)(pSyncMsg->data); + return raftMetaArr; +} + +SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg) { + int32_t arrSize = pSyncMsg->dataCount; + int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pSyncMsg->data) + raftMetaArrayLen); + return msgArr; +} + +SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg) { + SyncClientRequestBatch* pSyncMsg = taosMemoryMalloc(pRpcMsg->contLen); + ASSERT(pSyncMsg != NULL); + memcpy(pSyncMsg, pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pRpcMsg->contLen == pSyncMsg->bytes); + + return pSyncMsg; +} + +cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount); + + SRaftMeta* metaArr = syncClientRequestBatchMetaArr(pMsg); + SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pMsg); + + cJSON* pMetaArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr); + for (int i = 0; i < pMsg->dataCount; ++i) { + cJSON* pMeta = cJSON_CreateObject(); + cJSON_AddNumberToObject(pMeta, "seqNum", metaArr[i].seqNum); + cJSON_AddNumberToObject(pMeta, "isWeak", metaArr[i].isWeak); + cJSON_AddItemToArray(pMetaArr, pMeta); + } + + cJSON* pMsgArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr); + for (int i = 0; i < pMsg->dataCount; ++i) { + cJSON* pRpcMsgJson = syncRpcMsg2Json(&msgArr[i]); + cJSON_AddItemToArray(pMsgArr, pRpcMsgJson); + } + + char* s; + s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncClientRequestBatch", pRoot); + return pJson; +} + +char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg) { + cJSON* pJson = syncClientRequestBatch2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg) { + char* serialized = syncClientRequestBatch2Str(pMsg); + printf("syncClientRequestBatchPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg) { + char* serialized = syncClientRequestBatch2Str(pMsg); + printf("syncClientRequestBatchPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg) { + char* serialized = syncClientRequestBatch2Str(pMsg); + sTrace("syncClientRequestBatchLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncClientRequestBatch2Str(pMsg); + sLTrace("syncClientRequestBatchLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + +// ---- message process SyncAppendEntriesBatch---- + +// block1: SOffsetAndContLen +// block2: SOffsetAndContLen Array +// block3: entry Array + +SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) { + ASSERT(entryPArr != NULL); + ASSERT(arrSize >= 0); + + int32_t dataLen = 0; + int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // + int32_t entryArrayLen = 0; + for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont + SSyncRaftEntry* pEntry = entryPArr[i]; + entryArrayLen += pEntry->bytes; + } + dataLen += (metaArrayLen + entryArrayLen); + + uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen; + SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_BATCH; + pMsg->dataCount = arrSize; + pMsg->dataLen = dataLen; + + SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); + char* pData = pMsg->data; + + for (int i = 0; i < arrSize; ++i) { + // init meta + if (i == 0) { + metaArr[i].offset = metaArrayLen; + metaArr[i].contLen = entryPArr[i]->bytes; + } else { + metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen; + metaArr[i].contLen = entryPArr[i]->bytes; + } + + // init entry array + ASSERT(metaArr[i].contLen == entryPArr[i]->bytes); + memcpy(pData + metaArr[i].offset, entryPArr[i], metaArr[i].contLen); + } + + return pMsg; +} + +SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg) { + return (SOffsetAndContLen*)(pMsg->data); +} + +void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncAppendEntriesBatch) + pMsg->dataLen); +} + +char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncAppendEntriesBatchSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncAppendEntriesBatchDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncAppendEntriesBatchSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg) { + syncAppendEntriesBatchDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->prevLogIndex); + cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->prevLogTerm); + cJSON_AddStringToObject(pRoot, "prevLogTerm", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); + cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + + cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + + int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // + int32_t entryArrayLen = pMsg->dataLen - metaArrayLen; + + cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen); + cJSON_AddNumberToObject(pRoot, "entryArrayLen", entryArrayLen); + + SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); + + cJSON* pMetaArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr); + for (int i = 0; i < pMsg->dataCount; ++i) { + cJSON* pMeta = cJSON_CreateObject(); + cJSON_AddNumberToObject(pMeta, "offset", metaArr[i].offset); + cJSON_AddNumberToObject(pMeta, "contLen", metaArr[i].contLen); + cJSON_AddItemToArray(pMetaArr, pMeta); + } + + cJSON* pEntryArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "entryArr", pEntryArr); + for (int i = 0; i < pMsg->dataCount; ++i) { + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); + cJSON* pEntryJson = syncEntry2Json(pEntry); + cJSON_AddItemToArray(pEntryArr, pEntryJson); + } + + char* s; + s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncAppendEntriesBatch", pRoot); + return pJson; +} + +char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) { + cJSON* pJson = syncAppendEntriesBatch2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) { + char* serialized = syncAppendEntriesBatch2Str(pMsg); + printf("syncAppendEntriesBatchPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg) { + char* serialized = syncAppendEntriesBatch2Str(pMsg); + printf("syncAppendEntriesBatchPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg) { + char* serialized = syncAppendEntriesBatch2Str(pMsg); + sTrace("syncAppendEntriesBatchLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncAppendEntriesBatch2Str(pMsg); + sLTrace("syncAppendEntriesBatchLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + +void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-append-entries-batch to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen, pMsg->dataCount, s); +} + +void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-append-entries-batch from %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen, pMsg->dataCount, s); +} \ No newline at end of file diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index 56c35fcef8..2c662fa02e 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -695,138 +695,6 @@ void syncRpcMsgLog2(char* s, SRpcMsg* pMsg) { } } -cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->prevLogIndex); - cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->prevLogTerm); - cJSON_AddStringToObject(pRoot, "prevLogTerm", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex); - cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); - cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); - - cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount); - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - - int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // - int32_t entryArrayLen = pMsg->dataLen - metaArrayLen; - - cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen); - cJSON_AddNumberToObject(pRoot, "entryArrayLen", entryArrayLen); - - SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); - - cJSON* pMetaArr = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr); - for (int i = 0; i < pMsg->dataCount; ++i) { - cJSON* pMeta = cJSON_CreateObject(); - cJSON_AddNumberToObject(pMeta, "offset", metaArr[i].offset); - cJSON_AddNumberToObject(pMeta, "contLen", metaArr[i].contLen); - cJSON_AddItemToArray(pMetaArr, pMeta); - } - - cJSON* pEntryArr = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "entryArr", pEntryArr); - for (int i = 0; i < pMsg->dataCount; ++i) { - SSyncRaftEntry* pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); - cJSON* pEntryJson = syncEntry2Json(pEntry); - cJSON_AddItemToArray(pEntryArr, pEntryJson); - } - - char* s; - s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - taosMemoryFree(s); - } - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncAppendEntriesBatch", pRoot); - return pJson; -} - -char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) { - cJSON* pJson = syncAppendEntriesBatch2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) { - char* serialized = syncAppendEntriesBatch2Str(pMsg); - printf("syncAppendEntriesBatchPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg) { - char* serialized = syncAppendEntriesBatch2Str(pMsg); - printf("syncAppendEntriesBatchPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg) { - char* serialized = syncAppendEntriesBatch2Str(pMsg); - sTrace("syncAppendEntriesBatchLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncAppendEntriesBatch2Str(pMsg); - sLTrace("syncAppendEntriesBatchLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } -} - cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) { char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); @@ -982,86 +850,6 @@ char* syncClientRequest2Str(const SyncClientRequest* pMsg) { return serialized; } -cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount); - - SRaftMeta* metaArr = syncClientRequestBatchMetaArr(pMsg); - SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pMsg); - - cJSON* pMetaArr = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr); - for (int i = 0; i < pMsg->dataCount; ++i) { - cJSON* pMeta = cJSON_CreateObject(); - cJSON_AddNumberToObject(pMeta, "seqNum", metaArr[i].seqNum); - cJSON_AddNumberToObject(pMeta, "isWeak", metaArr[i].isWeak); - cJSON_AddItemToArray(pMetaArr, pMeta); - } - - cJSON* pMsgArr = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr); - for (int i = 0; i < pMsg->dataCount; ++i) { - cJSON* pRpcMsgJson = syncRpcMsg2Json(&msgArr[i]); - cJSON_AddItemToArray(pMsgArr, pRpcMsgJson); - } - - char* s; - s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - taosMemoryFree(s); - } - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncClientRequestBatch", pRoot); - return pJson; -} - -char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg) { - cJSON* pJson = syncClientRequestBatch2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg) { - char* serialized = syncClientRequestBatch2Str(pMsg); - printf("syncClientRequestBatchPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg) { - char* serialized = syncClientRequestBatch2Str(pMsg); - printf("syncClientRequestBatchPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg) { - char* serialized = syncClientRequestBatch2Str(pMsg); - sTrace("syncClientRequestBatchLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncClientRequestBatch2Str(pMsg); - sLTrace("syncClientRequestBatchLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } -} - // for debug ---------------------- void syncClientRequestPrint(const SyncClientRequest* pMsg) { char* serialized = syncClientRequest2Str(pMsg); -- GitLab