From 086ec29ca03ed09fbaa0195467deefa1707ec6c9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 2 Jul 2022 14:41:54 +0800 Subject: [PATCH] refactor(sync): add SyncClientRequestBatch --- include/libs/sync/syncTools.h | 53 ++++--- source/libs/sync/inc/syncRaftEntry.h | 9 +- source/libs/sync/inc/syncSnapshot.h | 14 +- source/libs/sync/src/syncAppendEntries.c | 15 +- source/libs/sync/src/syncMessage.c | 140 +++++++++++++++++- source/libs/sync/src/syncRaftEntry.c | 16 ++ source/libs/sync/src/syncReplication.c | 3 +- source/libs/sync/src/syncSnapshot.c | 10 +- source/libs/sync/test/CMakeLists.txt | 14 ++ .../sync/test/syncAppendEntriesBatchTest.cpp | 5 +- .../sync/test/syncClientRequestBatchTest.cpp | 125 ++++++++++++++++ .../test/syncConfigChangeSnapshotTest.cpp | 2 +- .../libs/sync/test/syncSnapshotSenderTest.cpp | 2 +- source/libs/sync/test/syncTestTool.cpp | 2 +- source/libs/transport/inc/transComm.h | 16 +- 15 files changed, 366 insertions(+), 60 deletions(-) create mode 100644 source/libs/sync/test/syncClientRequestBatchTest.cpp diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 745c63d5b3..8b658c8776 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -191,12 +191,12 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg); typedef struct SyncClientRequest { uint32_t bytes; int32_t vgId; - uint32_t msgType; // SyncClientRequest msgType - uint32_t originalRpcType; // user RpcMsg msgType + uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST + uint32_t originalRpcType; // origin RpcMsg msgType uint64_t seqNum; bool isWeak; - uint32_t dataLen; // user RpcMsg.contLen - char data[]; // user RpcMsg.pCont + uint32_t dataLen; // origin RpcMsg.contLen + char data[]; // origin RpcMsg.pCont } SyncClientRequest; SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); @@ -220,11 +220,6 @@ void syncClientRequestLog(const SyncClientRequest* pMsg); void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); // --------------------------------------------- -typedef struct SOffsetAndContLen { - int32_t offset; - int32_t contLen; -} SOffsetAndContLen; - typedef struct SRaftMeta { uint64_t seqNum; bool isWeak; @@ -232,20 +227,33 @@ typedef struct SRaftMeta { // block1: // block2: SRaftMeta array -// block3: rpc msg array (with pCont) +// block3: rpc msg array (with pCont pointer) typedef struct SyncClientRequestBatch { uint32_t bytes; int32_t vgId; - uint32_t msgType; // SyncClientRequestBatch msgType + uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH uint32_t dataCount; - uint32_t dataLen; // user RpcMsg.contLen - char data[]; // user RpcMsg.pCont + uint32_t dataLen; + char data[]; // block2, block3 } SyncClientRequestBatch; SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, int32_t vgId); void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); +void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg); +void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg); +SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg); +SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg); +SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg); +cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg); +char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg); + +// for debug ---------------------- +void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg); +void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg); // --------------------------------------------- typedef struct SyncClientRequestReply { @@ -318,12 +326,15 @@ void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg); void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg); // --------------------------------------------- +// data: entry + typedef struct SyncAppendEntries { uint32_t bytes; int32_t vgId; uint32_t msgType; SRaftId srcId; SRaftId destId; + // private data SyncTerm term; SyncIndex prevLogIndex; @@ -354,18 +365,14 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); // --------------------------------------------- -// define ahead -/* typedef struct SOffsetAndContLen { int32_t offset; int32_t contLen; } SOffsetAndContLen; -*/ -// block1: SOffsetAndContLen -// block2: SOffsetAndContLen Array -// block3: SRpcMsg Array -// block4: SRpcMsg pCont Array +// data: +// block1: SOffsetAndContLen Array +// block2: entry Array typedef struct SyncAppendEntriesBatch { uint32_t bytes; @@ -382,10 +389,12 @@ typedef struct SyncAppendEntriesBatch { SyncTerm privateTerm; int32_t dataCount; uint32_t dataLen; - char data[]; + char data[]; // block1, block2 } SyncAppendEntriesBatch; -SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId); +// SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId); + +SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId); void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg); void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen); void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 37f18a6388..82d5c0a6ea 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -29,19 +29,20 @@ extern "C" { typedef struct SSyncRaftEntry { uint32_t bytes; - uint32_t msgType; // SyncClientRequest msgType - uint32_t originalRpcType; // user RpcMsg msgType + uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST + uint32_t originalRpcType; // origin RpcMsg msgType uint64_t seqNum; bool isWeak; SyncTerm term; SyncIndex index; - uint32_t dataLen; // user RpcMsg.contLen - char data[]; // user RpcMsg.pCont + uint32_t dataLen; // origin RpcMsg.contLen + char data[]; // origin RpcMsg.pCont } SSyncRaftEntry; SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4 SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); +SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index); SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId); void syncEntryDestory(SSyncRaftEntry* pEntry); char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5 diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 6801e7212a..bb076ed2a1 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -40,8 +40,8 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshot snapshot; SSyncCfg lastConfig; @@ -62,14 +62,14 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); -char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); +char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); //--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshot snapshot; @@ -85,8 +85,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index be48227d40..c244b32cea 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -694,6 +694,8 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { return 0; } +static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEntriesBatch* pMsg) { return true; } + // really pre log match // prevLogIndex == -1 static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { @@ -844,8 +846,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc } while (0); // calculate logOK here, before will coredump, due to fake match - // bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); - bool logOK = true; + bool logOK = syncNodeOnAppendEntriesBatchLogOK(ths, pMsg); // not match // @@ -866,8 +867,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc if (condition) { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, not match, pre-index:%ld, pre-term:%lu, datalen:%d", - pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex, + pMsg->prevLogTerm, pMsg->dataLen); syncNodeEventLog(ths, logBuf); // prepare response msg @@ -914,7 +916,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc if (hasExtraEntries) { // make log same, rollback deleted entries - // code = syncNodeMakeLogSame(ths, pMsg); + code = syncNodeMakeLogSame2(ths, pMsg); ASSERT(code == 0); } @@ -926,7 +928,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // append entry batch for (int32_t i = 0; i < retArrSize; ++i) { - SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234); + // SSyncRaftEntry* pAppendEntry = syncEntryBuild4(&rpcMsgArr[i], ); + SSyncRaftEntry* pAppendEntry; code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { return -1; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index ad352df59f..664f4c8b73 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -996,7 +996,135 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet return pMsg; } -void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {} +void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pSyncMsg->msgType; + pRpcMsg->contLen = pSyncMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + memcpy(pRpcMsg->pCont, pSyncMsg, pRpcMsg->contLen); +} + +void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg) { + if (pMsg != NULL) { + int32_t arrSize = pMsg->dataCount; + int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); + for (int i = 0; i < arrSize; ++i) { + if (msgArr[i].pCont != NULL) { + rpcFreeCont(msgArr[i].pCont); + } + } + + taosMemoryFree(pMsg); + } +} + +SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg) { + SRaftMeta* raftMetaArr = (SRaftMeta*)(pSyncMsg->data); + return raftMetaArr; +} + +SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg) { + int32_t arrSize = pSyncMsg->dataCount; + int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize; + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pSyncMsg->data) + raftMetaArrayLen); + return msgArr; +} + +SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg) { + SyncClientRequestBatch* pSyncMsg = taosMemoryMalloc(pRpcMsg->contLen); + ASSERT(pSyncMsg != NULL); + memcpy(pSyncMsg, pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pRpcMsg->contLen == pSyncMsg->bytes); + + return pSyncMsg; +} + +cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount); + + SRaftMeta* metaArr = syncClientRequestBatchMetaArr(pMsg); + SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pMsg); + + cJSON* pMetaArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr); + for (int i = 0; i < pMsg->dataCount; ++i) { + cJSON* pMeta = cJSON_CreateObject(); + cJSON_AddNumberToObject(pMeta, "seqNum", metaArr[i].seqNum); + cJSON_AddNumberToObject(pMeta, "isWeak", metaArr[i].isWeak); + cJSON_AddItemToArray(pMetaArr, pMeta); + } + + cJSON* pMsgArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr); + for (int i = 0; i < pMsg->dataCount; ++i) { + cJSON* pRpcMsgJson = syncRpcMsg2Json(&msgArr[i]); + cJSON_AddItemToArray(pMsgArr, pRpcMsgJson); + } + + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncClientRequestBatch", pRoot); + return pJson; +} + +char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg) { + cJSON* pJson = syncClientRequestBatch2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg) { + char* serialized = syncClientRequestBatch2Str(pMsg); + printf("syncClientRequestBatchPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg) { + char* serialized = syncClientRequestBatch2Str(pMsg); + printf("syncClientRequestBatchPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg) { + char* serialized = syncClientRequestBatch2Str(pMsg); + sTrace("syncClientRequestBatchLog | len:%lu | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncClientRequestBatch2Str(pMsg); + sTraceLong("syncClientRequestBatchLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} // ---- message process SyncRequestVote---- SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { @@ -1475,6 +1603,7 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { // block3: SRpcMsg Array // block4: SRpcMsg pCont Array +/* SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId) { ASSERT(rpcMsgArr != NULL); ASSERT(arrSize > 0); @@ -1521,6 +1650,15 @@ SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t return pMsg; } +*/ + +// block1: SOffsetAndContLen +// block2: SOffsetAndContLen Array +// block3: entry Array + +SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) { + return NULL; +} void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) { if (pMsg != NULL) { diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 225360630c..0ee4684ea8 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -50,6 +50,22 @@ SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncInde return pEntry; } +SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index) { + SSyncRaftEntry* pEntry = syncEntryBuild(pOriginalMsg->contLen); + ASSERT(pEntry != NULL); + + pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; + pEntry->originalRpcType = pOriginalMsg->msgType; + pEntry->seqNum = 0; + pEntry->isWeak = 0; + pEntry->term = term; + pEntry->index = index; + pEntry->dataLen = pOriginalMsg->contLen; + memcpy(pEntry->data, pOriginalMsg->pCont, pOriginalMsg->contLen); + + return pEntry; +} + SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) { // init rpcMsg SMsgHead head; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4908822a3a..673cdca68d 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -162,7 +162,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { } } - SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, getCount, pSyncNode->vgId); + // SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, getCount, pSyncNode->vgId); + SyncAppendEntriesBatch* pMsg = NULL; ASSERT(pMsg != NULL); // prepare msg diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 4466cccbbc..dbab187f0e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -353,14 +353,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -612,7 +612,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -645,14 +645,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index c549b78399..37d9707cfd 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -24,6 +24,7 @@ add_executable(syncAppendEntriesTest "") add_executable(syncAppendEntriesBatchTest "") add_executable(syncAppendEntriesReplyTest "") add_executable(syncClientRequestTest "") +add_executable(syncClientRequestBatchTest "") add_executable(syncTimeoutTest "") add_executable(syncPingTest "") add_executable(syncPingReplyTest "") @@ -159,6 +160,10 @@ target_sources(syncClientRequestTest PRIVATE "syncClientRequestTest.cpp" ) +target_sources(syncClientRequestBatchTest + PRIVATE + "syncClientRequestBatchTest.cpp" +) target_sources(syncTimeoutTest PRIVATE "syncTimeoutTest.cpp" @@ -407,6 +412,11 @@ target_include_directories(syncClientRequestTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncClientRequestBatchTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncTimeoutTest PUBLIC "${TD_SOURCE_DIR}/include/libs/sync" @@ -658,6 +668,10 @@ target_link_libraries(syncClientRequestTest sync gtest_main ) +target_link_libraries(syncClientRequestBatchTest + sync + gtest_main +) target_link_libraries(syncTimeoutTest sync gtest_main diff --git a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp index 8784ddd637..b9000269e6 100644 --- a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp @@ -15,6 +15,7 @@ void logTest() { sFatal("--- sync log test: fatal"); } +/* SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) { SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); memset(pRpcMsg, 0, sizeof(SRpcMsg)); @@ -67,7 +68,6 @@ void test1() { syncAppendEntriesBatchDestroy(pMsg); } -/* void test2() { SyncAppendEntries *pMsg = createMsg(); uint32_t len = pMsg->bytes; @@ -126,9 +126,8 @@ int main() { sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; logTest(); - test1(); - /* + test1(); test2(); test3(); test4(); diff --git a/source/libs/sync/test/syncClientRequestBatchTest.cpp b/source/libs/sync/test/syncClientRequestBatchTest.cpp new file mode 100644 index 0000000000..ae74baeda4 --- /dev/null +++ b/source/libs/sync/test/syncClientRequestBatchTest.cpp @@ -0,0 +1,125 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) { + SyncPing *pSyncMsg = syncPingBuild(20); + snprintf(pSyncMsg->data, pSyncMsg->dataLen, "value_%d", i); + + SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); + memset(pRpcMsg, 0, sizeof(SRpcMsg)); + pRpcMsg->code = 10 * i; + syncPing2RpcMsg(pSyncMsg, pRpcMsg); + + syncPingDestroy(pSyncMsg); + return pRpcMsg; +} + +SyncClientRequestBatch *createMsg() { + SRpcMsg rpcMsgArr[5]; + memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); + for (int32_t i = 0; i < 5; ++i) { + SRpcMsg *pRpcMsg = createRpcMsg(i, 20); + rpcMsgArr[i] = *pRpcMsg; + taosMemoryFree(pRpcMsg); + } + + SRaftMeta raftArr[5]; + memset(raftArr, 0, sizeof(raftArr)); + for (int32_t i = 0; i < 5; ++i) { + raftArr[i].seqNum = i * 10; + raftArr[i].isWeak = i % 2; + } + + SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgArr, raftArr, 5, 1234); + return pMsg; +} + +void test1() { + SyncClientRequestBatch *pMsg = createMsg(); + syncClientRequestBatchLog2((char *)"==test1==", pMsg); + syncClientRequestBatchDestroyDeep(pMsg); +} + +/* +void test2() { + SyncClientRequest *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); + syncClientRequestSerialize(pMsg, serialized, len); + SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen); + syncClientRequestDeserialize(serialized, len, pMsg2); + syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncClientRequestDestroy(pMsg); + syncClientRequestDestroy(pMsg2); +} + +void test3() { + SyncClientRequest *pMsg = createMsg(); + uint32_t len; + char * serialized = syncClientRequestSerialize2(pMsg, &len); + SyncClientRequest *pMsg2 = syncClientRequestDeserialize2(serialized, len); + syncClientRequestLog2((char *)"test3: syncClientRequestSerialize3 -> syncClientRequestDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncClientRequestDestroy(pMsg); + syncClientRequestDestroy(pMsg2); +} + +void test4() { + SyncClientRequest *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pMsg, &rpcMsg); + SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(rpcMsg.contLen); + syncClientRequestFromRpcMsg(&rpcMsg, pMsg2); + syncClientRequestLog2((char *)"test4: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncClientRequestDestroy(pMsg); + syncClientRequestDestroy(pMsg2); +} + +void test5() { + SyncClientRequest *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pMsg, &rpcMsg); + SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg); + syncClientRequestLog2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncClientRequestDestroy(pMsg); + syncClientRequestDestroy(pMsg2); +} +*/ + +int main() { + gRaftDetailLog = true; + tsAsyncLog = 0; + sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + logTest(); + + test1(); + + /* +test2(); +test3(); +test4(); +test5(); +*/ + + return 0; +} diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 2908dd5907..b603d48b8b 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) { *ppReader = (void*)0xABCD; char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader); diff --git a/source/libs/sync/test/syncSnapshotSenderTest.cpp b/source/libs/sync/test/syncSnapshotSenderTest.cpp index dc38cd3df5..8d1f83b3b1 100644 --- a/source/libs/sync/test/syncSnapshotSenderTest.cpp +++ b/source/libs/sync/test/syncSnapshotSenderTest.cpp @@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { return 0; } +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 4e84ff0f7e..f7a4d91594 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) { *ppReader = (void*)0xABCD; char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index b06d541b75..21e7f7c7af 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,8 +96,8 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 15 // ms retry interval -#define TRANS_CONN_TIMEOUT 3 // connect timeout +#define TRANS_RETRY_INTERVAL 200 // ms retry interval +#define TRANS_CONN_TIMEOUT 3 // connect timeout typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; @@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) -#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) +#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) #define rpcIsReq(type) (type & 1U) #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) -#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) -#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) -#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) +#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) +#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) +#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) -#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); -#define transIsReq(type) (type & 1U) +#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); +#define transIsReq(type) (type & 1U) #define transLabel(trans) ((STrans*)trans)->label -- GitLab