From ebeb4bb7a60caaf9b082b1ae06df1dc440222822 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 10 Mar 2022 14:34:02 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncMessage.h | 3 - source/libs/sync/inc/syncRaftEntry.h | 12 ++-- source/libs/sync/inc/syncUtil.h | 3 + source/libs/sync/src/syncMessage.c | 2 +- source/libs/sync/src/syncRaftEntry.c | 44 ++++++++++++-- source/libs/sync/src/syncRaftLog.c | 19 ++++-- source/libs/sync/src/syncUtil.c | 36 +++++++++++ source/libs/sync/test/CMakeLists.txt | 16 +++++ source/libs/sync/test/syncEntryTest.cpp | 81 +++++++++++++++++++++++++ 9 files changed, 195 insertions(+), 21 deletions(-) create mode 100644 source/libs/sync/test/syncEntryTest.cpp diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index dcd8f7c74e..6231cb8399 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -130,9 +130,6 @@ typedef struct SyncClientRequest { char data[]; } SyncClientRequest; -#define SYNC_CLIENT_REQUEST_FIX_LEN \ - (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(uint32_t)) - SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); void syncClientRequestDestroy(SyncClientRequest* pMsg); void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 2d77af8835..b607849dc8 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -39,16 +39,14 @@ typedef struct SSyncRaftEntry { char data[]; } SSyncRaftEntry; -#define SYNC_ENTRY_FIX_LEN \ - (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(SyncTerm) + \ - sizeof(SyncIndex) + sizeof(uint32_t)) - -SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); +SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); +SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); void syncEntryDestory(SSyncRaftEntry* pEntry); -void syncEntrySerialize(const SSyncRaftEntry* pEntry, char* buf, uint32_t bufLen); -void syncEntryDeserialize(const char* buf, uint32_t len, SSyncRaftEntry* pEntry); +char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); +SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry); +void syncEntryPrint(const SSyncRaftEntry* pEntry); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index f2c979da99..2bbc1948dd 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -49,6 +49,9 @@ cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilRaftId2Json(const SRaftId* p); char* syncUtilRaftId2Str(const SRaftId* p); const char* syncUtilState2String(ESyncState state); +bool syncUtilCanPrint(char c); +char* syncUtilprintBin(char* ptr, uint32_t len); +char* syncUtilprintBin2(char* ptr, uint32_t len); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 11f823a048..baef49d748 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -348,7 +348,7 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) // ---- message process SyncClientRequest---- SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { - uint32_t bytes = SYNC_CLIENT_REQUEST_FIX_LEN + dataLen; + uint32_t bytes = sizeof(SyncClientRequest) + dataLen; SyncClientRequest* pMsg = malloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index cd14ea502e..7774cbb0ce 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -14,14 +14,22 @@ */ #include "syncRaftEntry.h" +#include "syncUtil.h" -SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { - uint32_t bytes = SYNC_ENTRY_FIX_LEN + pMsg->bytes; +SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SSyncRaftEntry) + dataLen; SSyncRaftEntry* pEntry = malloc(bytes); assert(pEntry != NULL); memset(pEntry, 0, bytes); - pEntry->bytes = bytes; + pEntry->dataLen = dataLen; + return pEntry; +} + +SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { + SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); + assert(pEntry != NULL); + pEntry->msgType = pMsg->msgType; pEntry->originalRpcType = pMsg->originalRpcType; pEntry->seqNum = pMsg->seqNum; @@ -40,14 +48,23 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) { } } -void syncEntrySerialize(const SSyncRaftEntry* pEntry, char* buf, uint32_t bufLen) { - assert(pEntry->bytes <= bufLen); +char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { + char* buf = malloc(pEntry->bytes); + assert(buf != NULL); memcpy(buf, pEntry, pEntry->bytes); + if (len != NULL) { + *len = pEntry->bytes; + } + return buf; } -void syncEntryDeserialize(const char* buf, uint32_t len, SSyncRaftEntry* pEntry) { +SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SSyncRaftEntry* pEntry = malloc(bytes); + assert(pEntry != NULL); memcpy(pEntry, buf, len); assert(len == pEntry->bytes); + return pEntry; } cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { @@ -66,6 +83,15 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { cJSON_AddStringToObject(pRoot, "index", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); + char* s; + s = syncUtilprintBin((char*)(pEntry->data), pEntry->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + + s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); + cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot); return pJson; @@ -76,4 +102,10 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) { char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; +} + +void syncEntryPrint(const SSyncRaftEntry* pEntry) { + char* s = syncEntry2Str(pEntry); + sTrace("%s", s); + free(s); } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index aa2595c199..d8a460629b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -47,13 +47,16 @@ void logStoreDestory(SSyncLogStore* pLogStore) { int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - char* buf = malloc(pEntry->bytes); - syncEntrySerialize(pEntry, buf, pEntry->bytes); - walWrite(pWal, pEntry->index, pEntry->msgType, buf, pEntry->bytes); + assert(pEntry->index == logStoreLastIndex(pLogStore) + 1); + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(serialized != NULL); + + walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); walFsync(pWal, true); - free(buf); + free(serialized); } // get one log entry, user need to free pEntry->pCont @@ -64,6 +67,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); walReadWithHandle(pWalHandle, index); + pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); + assert(pEntry != NULL); // need to hold, do not new every time!! walCloseReadHandle(pWalHandle); @@ -79,9 +84,15 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { // return index of last entry SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { + /* SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); SyncIndex lastIndex = pLastEntry->index; free(pLastEntry); + */ + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + int64_t last = walGetLastVer(pWal); + SyncIndex lastIndex = last < 0 ? 0 : last; return lastIndex; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 3fb430a714..736260cafc 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -148,4 +148,40 @@ const char* syncUtilState2String(ESyncState state) { } else { return "TAOS_SYNC_STATE_UNKNOWN"; } +} + +bool syncUtilCanPrint(char c) { + if (c >= 32 && c <= 126) { + return true; + } else { + return false; + } +} + +char* syncUtilprintBin(char* ptr, uint32_t len) { + char* s = malloc(len + 1); + assert(s != NULL); + memset(s, 0, len + 1); + memcpy(s, ptr, len); + + for (int i = 0; i < len; ++i) { + if (!syncUtilCanPrint(s[i])) { + s[i] = '.'; + } + } + return s; +} + +char* syncUtilprintBin2(char* ptr, uint32_t len) { + uint32_t len2 = len * 4 + 1; + char* s = malloc(len2); + assert(s != NULL); + memset(s, 0, len2); + + char* p = s; + for (int i = 0; i < len; ++i) { + int n = sprintf(p, "%d,", ptr[i]); + p += n; + } + return s; } \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 2163583f0a..bfde08ffac 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -16,6 +16,7 @@ add_executable(syncVotesGrantedTest "") add_executable(syncVotesRespondTest "") add_executable(syncIndexMgrTest "") add_executable(syncLogStoreTest "") +add_executable(syncEntryTest "") target_sources(syncTest @@ -90,6 +91,10 @@ target_sources(syncLogStoreTest PRIVATE "syncLogStoreTest.cpp" ) +target_sources(syncEntryTest + PRIVATE + "syncEntryTest.cpp" +) target_include_directories(syncTest @@ -182,6 +187,11 @@ target_include_directories(syncLogStoreTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEntryTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -256,6 +266,10 @@ target_link_libraries(syncLogStoreTest sync gtest_main ) +target_link_libraries(syncEntryTest + sync + gtest_main +) enable_testing() @@ -263,3 +277,5 @@ add_test( NAME sync_test COMMAND syncTest ) + + diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp new file mode 100644 index 0000000000..e54daaa79a --- /dev/null +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -0,0 +1,81 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void test1() { + SSyncRaftEntry* pEntry = syncEntryBuild(10); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100; + pEntry->index = 200; + strcpy(pEntry->data, "test1"); + + syncEntryPrint(pEntry); + syncEntryDestory(pEntry); +} + +void test2() { + SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); + pSyncMsg->originalRpcType = 33; + pSyncMsg->seqNum = 11; + pSyncMsg->isWeak = 1; + strcpy(pSyncMsg->data, "test2"); + + SSyncRaftEntry* pEntry = syncEntryBuild2(pSyncMsg, 100, 200); + syncEntryPrint(pEntry); + + syncClientRequestDestroy(pSyncMsg); + syncEntryDestory(pEntry); +} + +void test3() { + SSyncRaftEntry* pEntry = syncEntryBuild(10); + assert(pEntry != NULL); + pEntry->msgType = 11; + pEntry->originalRpcType = 22; + pEntry->seqNum = 33; + pEntry->isWeak = true; + pEntry->term = 44; + pEntry->index = 55; + strcpy(pEntry->data, "test3"); + syncEntryPrint(pEntry); + + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(serialized != NULL); + SSyncRaftEntry* pEntry2 = syncEntryDeserialize(serialized, len); + syncEntryPrint(pEntry2); + + free(serialized); + syncEntryDestory(pEntry2); + syncEntryDestory(pEntry); +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + test1(); + test2(); + test3(); + + return 0; +} -- GitLab