提交 317f7669 编写于 作者: M Minghao Li

sync refactor

上级 690dabd6
...@@ -47,6 +47,12 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); ...@@ -47,6 +47,12 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json(); cJSON* syncRpcUnknownMsg2Json();
char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); char* syncRpcMsg2Str(SRpcMsg* pRpcMsg);
// for debug ----------------------
void syncRpcMsgPrint(SRpcMsg* pMsg);
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
void syncRpcMsgLog(SRpcMsg* pMsg);
void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
// --------------------------------------------- // ---------------------------------------------
typedef enum ESyncTimeoutType { typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_PING = 100, SYNC_TIMEOUT_PING = 100,
......
...@@ -23,39 +23,47 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { ...@@ -23,39 +23,47 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
// in compiler optimization, switch case = if else constants // in compiler optimization, switch case = if else constants
if (pRpcMsg->msgType == SYNC_TIMEOUT) { if (pRpcMsg->msgType == SYNC_TIMEOUT) {
SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont; SyncTimeout* pSyncMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncTimeout2Json(pSyncMsg); pRoot = syncTimeout2Json(pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING) { } else if (pRpcMsg->msgType == SYNC_PING) {
SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; SyncPing* pSyncMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncPing2Json(pSyncMsg); pRoot = syncPing2Json(pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) { } else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont; SyncPingReply* pSyncMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncPingReply2Json(pSyncMsg); pRoot = syncPingReply2Json(pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
SyncClientRequest* pSyncMsg = (SyncClientRequest*)pRpcMsg->pCont; SyncClientRequest* pSyncMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncClientRequest2Json(pSyncMsg); pRoot = syncClientRequest2Json(pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) {
pRoot = syncRpcUnknownMsg2Json(); pRoot = syncRpcUnknownMsg2Json();
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont; SyncRequestVote* pSyncMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncRequestVote2Json(pSyncMsg); pRoot = syncRequestVote2Json(pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont; SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncRequestVoteReply2Json(pSyncMsg); pRoot = syncRequestVoteReply2Json(pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont; SyncAppendEntries* pSyncMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncAppendEntries2Json(pSyncMsg); pRoot = syncAppendEntries2Json(pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncAppendEntriesReply2Json(pSyncMsg); pRoot = syncAppendEntriesReply2Json(pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else { } else {
pRoot = syncRpcUnknownMsg2Json(); pRoot = syncRpcUnknownMsg2Json();
...@@ -72,7 +80,7 @@ cJSON* syncRpcUnknownMsg2Json() { ...@@ -72,7 +80,7 @@ cJSON* syncRpcUnknownMsg2Json() {
cJSON_AddStringToObject(pRoot, "data", "known message"); cJSON_AddStringToObject(pRoot, "data", "known message");
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot); cJSON_AddItemToObject(pJson, "SyncUnknown", pRoot);
return pJson; return pJson;
} }
...@@ -83,6 +91,33 @@ char* syncRpcMsg2Str(SRpcMsg* pRpcMsg) { ...@@ -83,6 +91,33 @@ char* syncRpcMsg2Str(SRpcMsg* pRpcMsg) {
return serialized; return serialized;
} }
// for debug ----------------------
void syncRpcMsgPrint(SRpcMsg* pMsg) {
char* serialized = syncRpcMsg2Str(pMsg);
printf("syncRpcMsgPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg) {
char* serialized = syncRpcMsg2Str(pMsg);
printf("syncRpcMsgPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncRpcMsgLog(SRpcMsg* pMsg) {
char* serialized = syncRpcMsg2Str(pMsg);
sTrace("syncRpcMsgLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncRpcMsgLog2(char* s, SRpcMsg* pMsg) {
char* serialized = syncRpcMsg2Str(pMsg);
sTrace("syncRpcMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
// ---- message process SyncTimeout---- // ---- message process SyncTimeout----
SyncTimeout* syncTimeoutBuild() { SyncTimeout* syncTimeoutBuild() {
uint32_t bytes = sizeof(SyncTimeout); uint32_t bytes = sizeof(SyncTimeout);
......
...@@ -25,6 +25,7 @@ add_executable(syncClientRequestTest "") ...@@ -25,6 +25,7 @@ add_executable(syncClientRequestTest "")
add_executable(syncTimeoutTest "") add_executable(syncTimeoutTest "")
add_executable(syncPingTest "") add_executable(syncPingTest "")
add_executable(syncPingReplyTest "") add_executable(syncPingReplyTest "")
add_executable(syncRpcMsgTest "")
target_sources(syncTest target_sources(syncTest
...@@ -135,6 +136,10 @@ target_sources(syncPingReplyTest ...@@ -135,6 +136,10 @@ target_sources(syncPingReplyTest
PRIVATE PRIVATE
"syncPingReplyTest.cpp" "syncPingReplyTest.cpp"
) )
target_sources(syncRpcMsgTest
PRIVATE
"syncRpcMsgTest.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
...@@ -272,6 +277,11 @@ target_include_directories(syncPingReplyTest ...@@ -272,6 +277,11 @@ target_include_directories(syncPingReplyTest
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncRpcMsgTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
...@@ -382,6 +392,10 @@ target_link_libraries(syncPingReplyTest ...@@ -382,6 +392,10 @@ target_link_libraries(syncPingReplyTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncRpcMsgTest
sync
gtest_main
)
enable_testing() enable_testing()
......
...@@ -22,6 +22,7 @@ SyncAppendEntriesReply *createMsg() { ...@@ -22,6 +22,7 @@ SyncAppendEntriesReply *createMsg() {
pMsg->destId.vgId = 100; pMsg->destId.vgId = 100;
pMsg->success = true; pMsg->success = true;
pMsg->matchIndex = 77; pMsg->matchIndex = 77;
return pMsg;
} }
void test1() { void test1() {
......
...@@ -22,6 +22,7 @@ SyncRequestVoteReply *createMsg() { ...@@ -22,6 +22,7 @@ SyncRequestVoteReply *createMsg() {
pMsg->destId.vgId = 100; pMsg->destId.vgId = 100;
pMsg->term = 77; pMsg->term = 77;
pMsg->voteGranted = true; pMsg->voteGranted = true;
return pMsg;
} }
void test1() { void test1() {
......
...@@ -23,6 +23,7 @@ SyncRequestVote *createMsg() { ...@@ -23,6 +23,7 @@ SyncRequestVote *createMsg() {
pMsg->currentTerm = 11; pMsg->currentTerm = 11;
pMsg->lastLogIndex = 22; pMsg->lastLogIndex = 22;
pMsg->lastLogTerm = 33; pMsg->lastLogTerm = 33;
return pMsg;
} }
void test1() { void test1() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int gg = 0;
SyncTimeout *createSyncTimeout() {
SyncTimeout *pMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, 999, 333, &gg);
return pMsg;
}
SyncPing *createSyncPing() {
SRaftId srcId, destId;
srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
srcId.vgId = 100;
destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
destId.vgId = 100;
SyncPing *pMsg = syncPingBuild3(&srcId, &destId);
return pMsg;
}
SyncPingReply *createSyncPingReply() {
SRaftId srcId, destId;
srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
srcId.vgId = 100;
destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
destId.vgId = 100;
SyncPingReply *pMsg = syncPingReplyBuild3(&srcId, &destId);
return pMsg;
}
SyncClientRequest *createSyncClientRequest() {
SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.msgType = 12345;
rpcMsg.contLen = 20;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
strcpy((char *)rpcMsg.pCont, "hello rpc");
SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true);
return pMsg;
}
SyncRequestVote *createSyncRequestVote() {
SyncRequestVote *pMsg = syncRequestVoteBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->currentTerm = 11;
pMsg->lastLogIndex = 22;
pMsg->lastLogTerm = 33;
return pMsg;
}
SyncRequestVoteReply *createSyncRequestVoteReply() {
SyncRequestVoteReply *pMsg = syncRequestVoteReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 77;
pMsg->voteGranted = true;
return pMsg;
}
SyncAppendEntries *createSyncAppendEntries() {
SyncAppendEntries *pMsg = syncAppendEntriesBuild(20);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->prevLogIndex = 11;
pMsg->prevLogTerm = 22;
pMsg->commitIndex = 33;
strcpy(pMsg->data, "hello world");
return pMsg;
}
SyncAppendEntriesReply *createSyncAppendEntriesReply() {
SyncAppendEntriesReply *pMsg = syncAppendEntriesReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->success = true;
pMsg->matchIndex = 77;
return pMsg;
}
void test1() {
SyncTimeout *pMsg = createSyncTimeout();
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test1", &rpcMsg);
syncTimeoutDestroy(pMsg);
}
void test2() {
SyncPing *pMsg = createSyncPing();
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test2", &rpcMsg);
syncPingDestroy(pMsg);
}
void test3() {
SyncPingReply *pMsg = createSyncPingReply();
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test3", &rpcMsg);
syncPingReplyDestroy(pMsg);
}
void test4() {
SyncRequestVote *pMsg = createSyncRequestVote();
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test4", &rpcMsg);
syncRequestVoteDestroy(pMsg);
}
void test5() {
SyncRequestVoteReply *pMsg = createSyncRequestVoteReply();
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test5", &rpcMsg);
syncRequestVoteReplyDestroy(pMsg);
}
void test6() {
SyncAppendEntries *pMsg = createSyncAppendEntries();
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test6", &rpcMsg);
syncAppendEntriesDestroy(pMsg);
}
void test7() {
SyncAppendEntriesReply *pMsg = createSyncAppendEntriesReply();
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test7", &rpcMsg);
syncAppendEntriesReplyDestroy(pMsg);
}
void test8() {
SyncClientRequest *pMsg = createSyncClientRequest();
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test8", &rpcMsg);
syncClientRequestDestroy(pMsg);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
test6();
test7();
test8();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册