diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3f19593e506a7057fc5121022d7688310e0d3edb..2876577410e977f90895d964b1f8e416d2efee26 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -47,6 +47,12 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcUnknownMsg2Json(); char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); +// for debug ---------------------- +void syncRpcMsgPrint(SRpcMsg* pMsg); +void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); +void syncRpcMsgLog(SRpcMsg* pMsg); +void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); + // --------------------------------------------- typedef enum ESyncTimeoutType { SYNC_TIMEOUT_PING = 100, diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 477ba1929e46a78b1876ca3f5be080bc7fb22204..5a55bbc11f855b6040cc6a7364338a08ffa8796c 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -23,39 +23,47 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { // in compiler optimization, switch case = if else constants if (pRpcMsg->msgType == SYNC_TIMEOUT) { - SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont; + SyncTimeout* pSyncMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncTimeout2Json(pSyncMsg); + syncTimeoutDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_PING) { - SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; + SyncPing* pSyncMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncPing2Json(pSyncMsg); + syncPingDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { - SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont; + SyncPingReply* pSyncMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncPingReply2Json(pSyncMsg); + syncPingReplyDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { - SyncClientRequest* pSyncMsg = (SyncClientRequest*)pRpcMsg->pCont; + SyncClientRequest* pSyncMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncClientRequest2Json(pSyncMsg); + syncClientRequestDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { pRoot = syncRpcUnknownMsg2Json(); } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { - SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont; + SyncRequestVote* pSyncMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncRequestVote2Json(pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont; + SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncRequestVoteReply2Json(pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { - SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont; + SyncAppendEntries* pSyncMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncAppendEntries2Json(pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; + SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncAppendEntriesReply2Json(pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); } else { pRoot = syncRpcUnknownMsg2Json(); @@ -72,7 +80,7 @@ cJSON* syncRpcUnknownMsg2Json() { cJSON_AddStringToObject(pRoot, "data", "known message"); cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + cJSON_AddItemToObject(pJson, "SyncUnknown", pRoot); return pJson; } @@ -83,6 +91,33 @@ char* syncRpcMsg2Str(SRpcMsg* pRpcMsg) { return serialized; } +// for debug ---------------------- +void syncRpcMsgPrint(SRpcMsg* pMsg) { + char* serialized = syncRpcMsg2Str(pMsg); + printf("syncRpcMsgPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg) { + char* serialized = syncRpcMsg2Str(pMsg); + printf("syncRpcMsgPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncRpcMsgLog(SRpcMsg* pMsg) { + char* serialized = syncRpcMsg2Str(pMsg); + sTrace("syncRpcMsgLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncRpcMsgLog2(char* s, SRpcMsg* pMsg) { + char* serialized = syncRpcMsg2Str(pMsg); + sTrace("syncRpcMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + // ---- message process SyncTimeout---- SyncTimeout* syncTimeoutBuild() { uint32_t bytes = sizeof(SyncTimeout); diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 3a87d3f2e2280af25cff7770ba1b7c46ffa56c24..6648ae35807d06db50770d14fadc9fe1a106d611 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -25,6 +25,7 @@ add_executable(syncClientRequestTest "") add_executable(syncTimeoutTest "") add_executable(syncPingTest "") add_executable(syncPingReplyTest "") +add_executable(syncRpcMsgTest "") target_sources(syncTest @@ -135,6 +136,10 @@ target_sources(syncPingReplyTest PRIVATE "syncPingReplyTest.cpp" ) +target_sources(syncRpcMsgTest + PRIVATE + "syncRpcMsgTest.cpp" +) target_include_directories(syncTest @@ -272,6 +277,11 @@ target_include_directories(syncPingReplyTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncRpcMsgTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -382,6 +392,10 @@ target_link_libraries(syncPingReplyTest sync gtest_main ) +target_link_libraries(syncRpcMsgTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp index c883ab941acaa518d31c05a146226d180a76d261..362da67c66869a85a06b6d34398ffd5f30d0f165 100644 --- a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp @@ -22,6 +22,7 @@ SyncAppendEntriesReply *createMsg() { pMsg->destId.vgId = 100; pMsg->success = true; pMsg->matchIndex = 77; + return pMsg; } void test1() { diff --git a/source/libs/sync/test/syncPingReplyTest.cpp b/source/libs/sync/test/syncPingReplyTest.cpp index 27f88b698cbf573595c7dc133e3b5c8e4a96dbb5..8e1448e7818c8d41c6b880208c2f4fe78ecb4035 100644 --- a/source/libs/sync/test/syncPingReplyTest.cpp +++ b/source/libs/sync/test/syncPingReplyTest.cpp @@ -32,8 +32,8 @@ void test1() { void test2() { SyncPingReply *pMsg = createMsg(); - uint32_t len = pMsg->bytes; - char * serialized = (char *)malloc(len); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); syncPingReplySerialize(pMsg, serialized, len); SyncPingReply *pMsg2 = syncPingReplyBuild(pMsg->dataLen); syncPingReplyDeserialize(serialized, len, pMsg2); @@ -46,8 +46,8 @@ void test2() { void test3() { SyncPingReply *pMsg = createMsg(); - uint32_t len; - char * serialized = syncPingReplySerialize2(pMsg, &len); + uint32_t len; + char * serialized = syncPingReplySerialize2(pMsg, &len); SyncPingReply *pMsg2 = syncPingReplyDeserialize2(serialized, len); syncPingReplyPrint2((char *)"test3: syncPingReplySerialize3 -> syncPingReplyDeserialize2 ", pMsg2); @@ -58,7 +58,7 @@ void test3() { void test4() { SyncPingReply *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsg, &rpcMsg); SyncPingReply *pMsg2 = (SyncPingReply *)malloc(rpcMsg.contLen); syncPingReplyFromRpcMsg(&rpcMsg, pMsg2); @@ -70,7 +70,7 @@ void test4() { void test5() { SyncPingReply *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsg, &rpcMsg); SyncPingReply *pMsg2 = syncPingReplyFromRpcMsg2(&rpcMsg); syncPingReplyPrint2((char *)"test5: syncPingReply2RpcMsg -> syncPingReplyFromRpcMsg2 ", pMsg2); diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 8776d2de5219cd96e559c70b149f3e61ae41b6d1..83394b0e77404b32ef665643fc85cdf6f8a602b6 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -32,8 +32,8 @@ void test1() { void test2() { SyncPing *pMsg = createMsg(); - uint32_t len = pMsg->bytes; - char * serialized = (char *)malloc(len); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); syncPingSerialize(pMsg, serialized, len); SyncPing *pMsg2 = syncPingBuild(pMsg->dataLen); syncPingDeserialize(serialized, len, pMsg2); @@ -46,8 +46,8 @@ void test2() { void test3() { SyncPing *pMsg = createMsg(); - uint32_t len; - char * serialized = syncPingSerialize2(pMsg, &len); + uint32_t len; + char * serialized = syncPingSerialize2(pMsg, &len); SyncPing *pMsg2 = syncPingDeserialize2(serialized, len); syncPingPrint2((char *)"test3: syncPingSerialize3 -> syncPingDeserialize2 ", pMsg2); @@ -58,7 +58,7 @@ void test3() { void test4() { SyncPing *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPing2RpcMsg(pMsg, &rpcMsg); SyncPing *pMsg2 = (SyncPing *)malloc(rpcMsg.contLen); syncPingFromRpcMsg(&rpcMsg, pMsg2); @@ -70,7 +70,7 @@ void test4() { void test5() { SyncPing *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPing2RpcMsg(pMsg, &rpcMsg); SyncPing *pMsg2 = syncPingFromRpcMsg2(&rpcMsg); syncPingPrint2((char *)"test5: syncPing2RpcMsg -> syncPingFromRpcMsg2 ", pMsg2); diff --git a/source/libs/sync/test/syncRequestVoteReplyTest.cpp b/source/libs/sync/test/syncRequestVoteReplyTest.cpp index 52f54cbaaff901d92047c5da77c60469534c9c86..2bce3e4cd6b00e077e6e5abc54f54a94b357995e 100644 --- a/source/libs/sync/test/syncRequestVoteReplyTest.cpp +++ b/source/libs/sync/test/syncRequestVoteReplyTest.cpp @@ -22,6 +22,7 @@ SyncRequestVoteReply *createMsg() { pMsg->destId.vgId = 100; pMsg->term = 77; pMsg->voteGranted = true; + return pMsg; } void test1() { diff --git a/source/libs/sync/test/syncRequestVoteTest.cpp b/source/libs/sync/test/syncRequestVoteTest.cpp index e46eb87fe9bc465829146ddaef6eeb91a57c8ce4..7f75ee937b17fc1b36370ef40559c6197a53e77b 100644 --- a/source/libs/sync/test/syncRequestVoteTest.cpp +++ b/source/libs/sync/test/syncRequestVoteTest.cpp @@ -23,6 +23,7 @@ SyncRequestVote *createMsg() { pMsg->currentTerm = 11; pMsg->lastLogIndex = 22; pMsg->lastLogTerm = 33; + return pMsg; } void test1() { diff --git a/source/libs/sync/test/syncRpcMsgTest.cpp b/source/libs/sync/test/syncRpcMsgTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0331a29f226f379fe96b31f51a085986b5716414 --- /dev/null +++ b/source/libs/sync/test/syncRpcMsgTest.cpp @@ -0,0 +1,181 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int gg = 0; +SyncTimeout *createSyncTimeout() { + SyncTimeout *pMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, 999, 333, &gg); + return pMsg; +} + +SyncPing *createSyncPing() { + SRaftId srcId, destId; + srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + srcId.vgId = 100; + destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + destId.vgId = 100; + SyncPing *pMsg = syncPingBuild3(&srcId, &destId); + return pMsg; +} + +SyncPingReply *createSyncPingReply() { + SRaftId srcId, destId; + srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + srcId.vgId = 100; + destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + destId.vgId = 100; + SyncPingReply *pMsg = syncPingReplyBuild3(&srcId, &destId); + return pMsg; +} + +SyncClientRequest *createSyncClientRequest() { + SRpcMsg rpcMsg; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.msgType = 12345; + rpcMsg.contLen = 20; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + strcpy((char *)rpcMsg.pCont, "hello rpc"); + SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true); + return pMsg; +} + +SyncRequestVote *createSyncRequestVote() { + SyncRequestVote *pMsg = syncRequestVoteBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->currentTerm = 11; + pMsg->lastLogIndex = 22; + pMsg->lastLogTerm = 33; + return pMsg; +} + +SyncRequestVoteReply *createSyncRequestVoteReply() { + SyncRequestVoteReply *pMsg = syncRequestVoteReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 77; + pMsg->voteGranted = true; + return pMsg; +} + +SyncAppendEntries *createSyncAppendEntries() { + SyncAppendEntries *pMsg = syncAppendEntriesBuild(20); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 11; + pMsg->prevLogTerm = 22; + pMsg->commitIndex = 33; + strcpy(pMsg->data, "hello world"); + return pMsg; +} + +SyncAppendEntriesReply *createSyncAppendEntriesReply() { + SyncAppendEntriesReply *pMsg = syncAppendEntriesReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->success = true; + pMsg->matchIndex = 77; + return pMsg; +} + +void test1() { + SyncTimeout *pMsg = createSyncTimeout(); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test1", &rpcMsg); + syncTimeoutDestroy(pMsg); +} + +void test2() { + SyncPing *pMsg = createSyncPing(); + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test2", &rpcMsg); + syncPingDestroy(pMsg); +} + +void test3() { + SyncPingReply *pMsg = createSyncPingReply(); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test3", &rpcMsg); + syncPingReplyDestroy(pMsg); +} + +void test4() { + SyncRequestVote *pMsg = createSyncRequestVote(); + SRpcMsg rpcMsg; + syncRequestVote2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test4", &rpcMsg); + syncRequestVoteDestroy(pMsg); +} + +void test5() { + SyncRequestVoteReply *pMsg = createSyncRequestVoteReply(); + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test5", &rpcMsg); + syncRequestVoteReplyDestroy(pMsg); +} + +void test6() { + SyncAppendEntries *pMsg = createSyncAppendEntries(); + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test6", &rpcMsg); + syncAppendEntriesDestroy(pMsg); +} + +void test7() { + SyncAppendEntriesReply *pMsg = createSyncAppendEntriesReply(); + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test7", &rpcMsg); + syncAppendEntriesReplyDestroy(pMsg); +} + +void test8() { + SyncClientRequest *pMsg = createSyncClientRequest(); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test8", &rpcMsg); + syncClientRequestDestroy(pMsg); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + test6(); + test7(); + test8(); + + return 0; +}