提交 981461ea 编写于 作者: M Minghao Li

refactor(sync): add PreSnapshotTest

上级 fd92f4b6
...@@ -270,6 +270,8 @@ enum { ...@@ -270,6 +270,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
#if defined(TD_MSG_NUMBER_) #if defined(TD_MSG_NUMBER_)
......
...@@ -513,6 +513,37 @@ void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg); ...@@ -513,6 +513,37 @@ void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg);
void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg); void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg);
void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg); void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg);
// ---------------------------------------------
typedef struct SyncPreSnapshot {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
// private data
SyncTerm term;
} SyncPreSnapshot;
SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId);
void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg);
void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen);
void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg);
char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len);
SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len);
void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg);
void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg);
SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg);
char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg);
// for debug ----------------------
void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg);
void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg);
void syncPreSnapshotLog(const SyncPreSnapshot* pMsg);
void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg);
// --------------------------------------------- // ---------------------------------------------
typedef struct SyncApplyMsg { typedef struct SyncApplyMsg {
uint32_t bytes; uint32_t bytes;
......
...@@ -2315,6 +2315,153 @@ void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg) { ...@@ -2315,6 +2315,153 @@ void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg) {
} }
} }
// ---- message process SyncPreSnapshot----
SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncPreSnapshot);
SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT;
return pMsg;
}
void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncPreSnapshotSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncPreSnapshotDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncPreSnapshotSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg) {
syncPreSnapshotDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncPreSnapshot* pMsg = syncPreSnapshotDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPreSnapshot", pRoot);
return pJson;
}
char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg) {
cJSON* pJson = syncPreSnapshot2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg) {
char* serialized = syncPreSnapshot2Str(pMsg);
printf("syncPreSnapshotPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg) {
char* serialized = syncPreSnapshot2Str(pMsg);
printf("syncPreSnapshotPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncPreSnapshotLog(const SyncPreSnapshot* pMsg) {
char* serialized = syncPreSnapshot2Str(pMsg);
sTrace("syncPreSnapshotLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncPreSnapshot2Str(pMsg);
sTrace("syncPreSnapshotLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ---- message process SyncApplyMsg---- // ---- message process SyncApplyMsg----
SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncApplyMsg) + dataLen; uint32_t bytes = sizeof(SyncApplyMsg) + dataLen;
......
...@@ -60,6 +60,7 @@ add_executable(syncRaftCfgIndexTest "") ...@@ -60,6 +60,7 @@ add_executable(syncRaftCfgIndexTest "")
add_executable(syncHeartbeatTest "") add_executable(syncHeartbeatTest "")
add_executable(syncHeartbeatReplyTest "") add_executable(syncHeartbeatReplyTest "")
add_executable(syncLocalCmdTest "") add_executable(syncLocalCmdTest "")
add_executable(syncPreSnapshotTest "")
target_sources(syncTest target_sources(syncTest
...@@ -310,6 +311,10 @@ target_sources(syncLocalCmdTest ...@@ -310,6 +311,10 @@ target_sources(syncLocalCmdTest
PRIVATE PRIVATE
"syncLocalCmdTest.cpp" "syncLocalCmdTest.cpp"
) )
target_sources(syncPreSnapshotTest
PRIVATE
"syncPreSnapshotTest.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
...@@ -622,6 +627,11 @@ target_include_directories(syncLocalCmdTest ...@@ -622,6 +627,11 @@ target_include_directories(syncLocalCmdTest
"${TD_SOURCE_DIR}/include/libs/sync" "${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncPreSnapshotTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
...@@ -872,6 +882,10 @@ target_link_libraries(syncLocalCmdTest ...@@ -872,6 +882,10 @@ target_link_libraries(syncLocalCmdTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncPreSnapshotTest
sync
gtest_main
)
enable_testing() enable_testing()
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncPreSnapshot *createMsg() {
SyncPreSnapshot *pMsg = syncPreSnapshotBuild(789);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 9527;
return pMsg;
}
void test1() {
SyncPreSnapshot *pMsg = createMsg();
syncPreSnapshotLog2((char *)"test1:", pMsg);
syncPreSnapshotDestroy(pMsg);
}
void test2() {
SyncPreSnapshot *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)taosMemoryMalloc(len);
syncPreSnapshotSerialize(pMsg, serialized, len);
SyncPreSnapshot *pMsg2 = syncPreSnapshotBuild(789);
syncPreSnapshotDeserialize(serialized, len, pMsg2);
syncPreSnapshotLog2((char *)"test2: syncPreSnapshotSerialize -> syncPreSnapshotDeserialize ", pMsg2);
taosMemoryFree(serialized);
syncPreSnapshotDestroy(pMsg);
syncPreSnapshotDestroy(pMsg2);
}
void test3() {
SyncPreSnapshot *pMsg = createMsg();
uint32_t len;
char * serialized = syncPreSnapshotSerialize2(pMsg, &len);
SyncPreSnapshot *pMsg2 = syncPreSnapshotDeserialize2(serialized, len);
syncPreSnapshotLog2((char *)"test3: syncPreSnapshotSerialize2 -> syncPreSnapshotDeserialize2 ", pMsg2);
taosMemoryFree(serialized);
syncPreSnapshotDestroy(pMsg);
syncPreSnapshotDestroy(pMsg2);
}
void test4() {
SyncPreSnapshot *pMsg = createMsg();
SRpcMsg rpcMsg;
syncPreSnapshot2RpcMsg(pMsg, &rpcMsg);
SyncPreSnapshot *pMsg2 = (SyncPreSnapshot *)taosMemoryMalloc(rpcMsg.contLen);
syncPreSnapshotFromRpcMsg(&rpcMsg, pMsg2);
syncPreSnapshotLog2((char *)"test4: syncPreSnapshot2RpcMsg -> syncPreSnapshotFromRpcMsg ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncPreSnapshotDestroy(pMsg);
syncPreSnapshotDestroy(pMsg2);
}
void test5() {
SyncPreSnapshot *pMsg = createMsg();
SRpcMsg rpcMsg;
syncPreSnapshot2RpcMsg(pMsg, &rpcMsg);
SyncPreSnapshot *pMsg2 = syncPreSnapshotFromRpcMsg2(&rpcMsg);
syncPreSnapshotLog2((char *)"test5: syncPreSnapshot2RpcMsg -> syncPreSnapshotFromRpcMsg2 ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncPreSnapshotDestroy(pMsg);
syncPreSnapshotDestroy(pMsg2);
}
int main() {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
gRaftDetailLog = true;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册