From 9988e85f852128bf76e7d7c4221012a2c0d9189a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 31 May 2022 21:31:07 +0800 Subject: [PATCH] add test sender, receiver --- source/libs/sync/inc/syncSnapshot.h | 16 ++--- source/libs/sync/src/syncSnapshot.c | 4 +- source/libs/sync/test/CMakeLists.txt | 28 ++++++++ .../sync/test/syncSnapshotReceiverTest.cpp | 65 +++++++++++++++++ source/libs/sync/test/syncSnapshotRspTest.cpp | 12 ++-- .../libs/sync/test/syncSnapshotSendTest.cpp | 12 ++-- .../libs/sync/test/syncSnapshotSenderTest.cpp | 70 +++++++++++++++++++ 7 files changed, 185 insertions(+), 22 deletions(-) create mode 100644 source/libs/sync/test/syncSnapshotReceiverTest.cpp create mode 100644 source/libs/sync/test/syncSnapshotSenderTest.cpp diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 99c92539f3..e1a252eed4 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -37,8 +37,8 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshot snapshot; int64_t sendingMS; @@ -52,15 +52,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); -cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); +cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; - void *pCurrentBlock; + void * pWriter; + void * pCurrentBlock; int32_t blockLen; SyncTerm term; @@ -72,8 +72,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index ab09c3eee9..5fc49d9ffa 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -175,9 +175,9 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { cJSON *pSnapshot = cJSON_CreateObject(); snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex); - cJSON_AddStringToObject(pRoot, "lastApplyIndex", u64buf); + cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm); - cJSON_AddStringToObject(pRoot, "lastApplyTerm", u64buf); + cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf); cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot); snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS); diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index b348051eb6..d1e747f29d 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -40,6 +40,8 @@ add_executable(syncApplyMsgTest "") add_executable(syncConfigChangeTest "") add_executable(syncSnapshotSendTest "") add_executable(syncSnapshotRspTest "") +add_executable(syncSnapshotSenderTest "") +add_executable(syncSnapshotReceiverTest "") target_sources(syncTest @@ -210,6 +212,14 @@ target_sources(syncSnapshotRspTest PRIVATE "syncSnapshotRspTest.cpp" ) +target_sources(syncSnapshotSenderTest + PRIVATE + "syncSnapshotSenderTest.cpp" +) +target_sources(syncSnapshotReceiverTest + PRIVATE + "syncSnapshotReceiverTest.cpp" +) target_include_directories(syncTest @@ -422,6 +432,16 @@ target_include_directories(syncSnapshotRspTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncSnapshotSenderTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncSnapshotReceiverTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -592,6 +612,14 @@ target_link_libraries(syncSnapshotRspTest sync gtest_main ) +target_link_libraries(syncSnapshotSenderTest + sync + gtest_main +) +target_link_libraries(syncSnapshotReceiverTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp new file mode 100644 index 0000000000..e912ecc597 --- /dev/null +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -0,0 +1,65 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftStore.h" +#include "syncSnapshot.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {} +void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {} +void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {} + +void RestoreFinishCb(struct SSyncFSM* pFsm) {} +void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {} + +int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } + +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; } +int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } +int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } + +int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) { return 0; } +int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } +int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } + +SSyncSnapshotReceiver* createReceiver() { + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(*pSyncNode)); + pSyncNode->pRaftStore = (SRaftStore*)taosMemoryMalloc(sizeof(*(pSyncNode->pRaftStore))); + pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(*(pSyncNode->pFsm))); + pSyncNode->pFsm->FpSnapshotStartWrite = SnapshotStartWrite; + pSyncNode->pFsm->FpSnapshotStopWrite = SnapshotStopWrite; + pSyncNode->pFsm->FpSnapshotDoWrite = SnapshotDoWrite; + + SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, 2); + pReceiver->start = true; + pReceiver->ack = 20; + pReceiver->pWriter = (void*)0x11; + pReceiver->blockLen = 20; + pReceiver->pCurrentBlock = taosMemoryMalloc(pReceiver->blockLen); + snprintf((char*)(pReceiver->pCurrentBlock), pReceiver->blockLen, "%s", "hello"); + pReceiver->term = 66; + + return pReceiver; +} + +int main() { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + logTest(); + + SSyncSnapshotReceiver* pReceiver = createReceiver(); + sTrace("%s", snapshotReceiver2Str(pReceiver)); + + return 0; +} diff --git a/source/libs/sync/test/syncSnapshotRspTest.cpp b/source/libs/sync/test/syncSnapshotRspTest.cpp index 9e455c6efd..3c1ce11456 100644 --- a/source/libs/sync/test/syncSnapshotRspTest.cpp +++ b/source/libs/sync/test/syncSnapshotRspTest.cpp @@ -35,8 +35,8 @@ void test1() { void test2() { SyncSnapshotRsp *pMsg = createMsg(); - uint32_t len = pMsg->bytes; - char * serialized = (char *)taosMemoryMalloc(len); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); syncSnapshotRspSerialize(pMsg, serialized, len); SyncSnapshotRsp *pMsg2 = syncSnapshotRspBuild(1000); syncSnapshotRspDeserialize(serialized, len, pMsg2); @@ -49,8 +49,8 @@ void test2() { void test3() { SyncSnapshotRsp *pMsg = createMsg(); - uint32_t len; - char * serialized = syncSnapshotRspSerialize2(pMsg, &len); + uint32_t len; + char * serialized = syncSnapshotRspSerialize2(pMsg, &len); SyncSnapshotRsp *pMsg2 = syncSnapshotRspDeserialize2(serialized, len); syncSnapshotRspLog2((char *)"test3: syncSnapshotRspSerialize2 -> syncSnapshotRspDeserialize2 ", pMsg2); @@ -61,7 +61,7 @@ void test3() { void test4() { SyncSnapshotRsp *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncSnapshotRsp2RpcMsg(pMsg, &rpcMsg); SyncSnapshotRsp *pMsg2 = (SyncSnapshotRsp *)taosMemoryMalloc(rpcMsg.contLen); syncSnapshotRspFromRpcMsg(&rpcMsg, pMsg2); @@ -74,7 +74,7 @@ void test4() { void test5() { SyncSnapshotRsp *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncSnapshotRsp2RpcMsg(pMsg, &rpcMsg); SyncSnapshotRsp *pMsg2 = syncSnapshotRspFromRpcMsg2(&rpcMsg); syncSnapshotRspLog2((char *)"test5: syncSnapshotRsp2RpcMsg -> syncSnapshotRspFromRpcMsg2 ", pMsg2); diff --git a/source/libs/sync/test/syncSnapshotSendTest.cpp b/source/libs/sync/test/syncSnapshotSendTest.cpp index 8985a3f1a2..ca4ac2abeb 100644 --- a/source/libs/sync/test/syncSnapshotSendTest.cpp +++ b/source/libs/sync/test/syncSnapshotSendTest.cpp @@ -36,8 +36,8 @@ void test1() { void test2() { SyncSnapshotSend *pMsg = createMsg(); - uint32_t len = pMsg->bytes; - char * serialized = (char *)taosMemoryMalloc(len); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); syncSnapshotSendSerialize(pMsg, serialized, len); SyncSnapshotSend *pMsg2 = syncSnapshotSendBuild(pMsg->dataLen, 1000); syncSnapshotSendDeserialize(serialized, len, pMsg2); @@ -50,8 +50,8 @@ void test2() { void test3() { SyncSnapshotSend *pMsg = createMsg(); - uint32_t len; - char * serialized = syncSnapshotSendSerialize2(pMsg, &len); + uint32_t len; + char * serialized = syncSnapshotSendSerialize2(pMsg, &len); SyncSnapshotSend *pMsg2 = syncSnapshotSendDeserialize2(serialized, len); syncSnapshotSendLog2((char *)"test3: syncSnapshotSendSerialize2 -> syncSnapshotSendDeserialize2 ", pMsg2); @@ -62,7 +62,7 @@ void test3() { void test4() { SyncSnapshotSend *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); SyncSnapshotSend *pMsg2 = (SyncSnapshotSend *)taosMemoryMalloc(rpcMsg.contLen); syncSnapshotSendFromRpcMsg(&rpcMsg, pMsg2); @@ -75,7 +75,7 @@ void test4() { void test5() { SyncSnapshotSend *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); SyncSnapshotSend *pMsg2 = syncSnapshotSendFromRpcMsg2(&rpcMsg); syncSnapshotSendLog2((char *)"test5: syncSnapshotSend2RpcMsg -> syncSnapshotSendFromRpcMsg2 ", pMsg2); diff --git a/source/libs/sync/test/syncSnapshotSenderTest.cpp b/source/libs/sync/test/syncSnapshotSenderTest.cpp new file mode 100644 index 0000000000..76d021004f --- /dev/null +++ b/source/libs/sync/test/syncSnapshotSenderTest.cpp @@ -0,0 +1,70 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftStore.h" +#include "syncSnapshot.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {} +void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {} +void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {} + +void RestoreFinishCb(struct SSyncFSM* pFsm) {} +void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {} + +int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } + +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; } +int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } +int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } + +int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) { return 0; } +int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } +int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } + +SSyncSnapshotSender* createSender() { + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(*pSyncNode)); + pSyncNode->pRaftStore = (SRaftStore*)taosMemoryMalloc(sizeof(*(pSyncNode->pRaftStore))); + pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(*(pSyncNode->pFsm))); + pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead; + pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead; + pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead; + + SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2); + pSender->start = true; + pSender->seq = 10; + pSender->ack = 20; + pSender->pReader = (void*)0x11; + pSender->blockLen = 20; + pSender->pCurrentBlock = taosMemoryMalloc(pSender->blockLen); + snprintf((char*)(pSender->pCurrentBlock), pSender->blockLen, "%s", "hello"); + + pSender->snapshot.lastApplyIndex = 99; + pSender->snapshot.lastApplyTerm = 88; + pSender->sendingMS = 77; + pSender->term = 66; + + return pSender; +} + +int main() { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + logTest(); + + SSyncSnapshotSender* pSender = createSender(); + sTrace("%s", snapshotSender2Str(pSender)); + + return 0; +} -- GitLab