提交 ebeb4bb7 编写于 作者: M Minghao Li

sync refactor

上级 a51e5761
......@@ -130,9 +130,6 @@ typedef struct SyncClientRequest {
char data[];
} SyncClientRequest;
#define SYNC_CLIENT_REQUEST_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(uint32_t))
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
void syncClientRequestDestroy(SyncClientRequest* pMsg);
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
......
......@@ -39,16 +39,14 @@ typedef struct SSyncRaftEntry {
char data[];
} SSyncRaftEntry;
#define SYNC_ENTRY_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(SyncTerm) + \
sizeof(SyncIndex) + sizeof(uint32_t))
SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
void syncEntryDestory(SSyncRaftEntry* pEntry);
void syncEntrySerialize(const SSyncRaftEntry* pEntry, char* buf, uint32_t bufLen);
void syncEntryDeserialize(const char* buf, uint32_t len, SSyncRaftEntry* pEntry);
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len);
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len);
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
void syncEntryPrint(const SSyncRaftEntry* pEntry);
#ifdef __cplusplus
}
......
......@@ -49,6 +49,9 @@ cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
cJSON* syncUtilRaftId2Json(const SRaftId* p);
char* syncUtilRaftId2Str(const SRaftId* p);
const char* syncUtilState2String(ESyncState state);
bool syncUtilCanPrint(char c);
char* syncUtilprintBin(char* ptr, uint32_t len);
char* syncUtilprintBin2(char* ptr, uint32_t len);
#ifdef __cplusplus
}
......
......@@ -348,7 +348,7 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId)
// ---- message process SyncClientRequest----
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_CLIENT_REQUEST_FIX_LEN + dataLen;
uint32_t bytes = sizeof(SyncClientRequest) + dataLen;
SyncClientRequest* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
......
......@@ -14,14 +14,22 @@
*/
#include "syncRaftEntry.h"
#include "syncUtil.h"
SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
uint32_t bytes = SYNC_ENTRY_FIX_LEN + pMsg->bytes;
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SSyncRaftEntry) + dataLen;
SSyncRaftEntry* pEntry = malloc(bytes);
assert(pEntry != NULL);
memset(pEntry, 0, bytes);
pEntry->bytes = bytes;
pEntry->dataLen = dataLen;
return pEntry;
}
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
assert(pEntry != NULL);
pEntry->msgType = pMsg->msgType;
pEntry->originalRpcType = pMsg->originalRpcType;
pEntry->seqNum = pMsg->seqNum;
......@@ -40,14 +48,23 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) {
}
}
void syncEntrySerialize(const SSyncRaftEntry* pEntry, char* buf, uint32_t bufLen) {
assert(pEntry->bytes <= bufLen);
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
char* buf = malloc(pEntry->bytes);
assert(buf != NULL);
memcpy(buf, pEntry, pEntry->bytes);
if (len != NULL) {
*len = pEntry->bytes;
}
return buf;
}
void syncEntryDeserialize(const char* buf, uint32_t len, SSyncRaftEntry* pEntry) {
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SSyncRaftEntry* pEntry = malloc(bytes);
assert(pEntry != NULL);
memcpy(pEntry, buf, len);
assert(len == pEntry->bytes);
return pEntry;
}
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
......@@ -66,6 +83,15 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
cJSON_AddStringToObject(pRoot, "index", u64buf);
cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen);
char* s;
s = syncUtilprintBin((char*)(pEntry->data), pEntry->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
free(s);
s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
free(s);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot);
return pJson;
......@@ -77,3 +103,9 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
cJSON_Delete(pJson);
return serialized;
}
void syncEntryPrint(const SSyncRaftEntry* pEntry) {
char* s = syncEntry2Str(pEntry);
sTrace("%s", s);
free(s);
}
\ No newline at end of file
......@@ -47,13 +47,16 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
char* buf = malloc(pEntry->bytes);
syncEntrySerialize(pEntry, buf, pEntry->bytes);
walWrite(pWal, pEntry->index, pEntry->msgType, buf, pEntry->bytes);
assert(pEntry->index == logStoreLastIndex(pLogStore) + 1);
uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len);
assert(serialized != NULL);
walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len);
walFsync(pWal, true);
free(buf);
free(serialized);
}
// get one log entry, user need to free pEntry->pCont
......@@ -64,6 +67,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
walReadWithHandle(pWalHandle, index);
pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
assert(pEntry != NULL);
// need to hold, do not new every time!!
walCloseReadHandle(pWalHandle);
......@@ -79,9 +84,15 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
/*
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
SyncIndex lastIndex = pLastEntry->index;
free(pLastEntry);
*/
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
int64_t last = walGetLastVer(pWal);
SyncIndex lastIndex = last < 0 ? 0 : last;
return lastIndex;
}
......
......@@ -149,3 +149,39 @@ const char* syncUtilState2String(ESyncState state) {
return "TAOS_SYNC_STATE_UNKNOWN";
}
}
bool syncUtilCanPrint(char c) {
if (c >= 32 && c <= 126) {
return true;
} else {
return false;
}
}
char* syncUtilprintBin(char* ptr, uint32_t len) {
char* s = malloc(len + 1);
assert(s != NULL);
memset(s, 0, len + 1);
memcpy(s, ptr, len);
for (int i = 0; i < len; ++i) {
if (!syncUtilCanPrint(s[i])) {
s[i] = '.';
}
}
return s;
}
char* syncUtilprintBin2(char* ptr, uint32_t len) {
uint32_t len2 = len * 4 + 1;
char* s = malloc(len2);
assert(s != NULL);
memset(s, 0, len2);
char* p = s;
for (int i = 0; i < len; ++i) {
int n = sprintf(p, "%d,", ptr[i]);
p += n;
}
return s;
}
\ No newline at end of file
......@@ -16,6 +16,7 @@ add_executable(syncVotesGrantedTest "")
add_executable(syncVotesRespondTest "")
add_executable(syncIndexMgrTest "")
add_executable(syncLogStoreTest "")
add_executable(syncEntryTest "")
target_sources(syncTest
......@@ -90,6 +91,10 @@ target_sources(syncLogStoreTest
PRIVATE
"syncLogStoreTest.cpp"
)
target_sources(syncEntryTest
PRIVATE
"syncEntryTest.cpp"
)
target_include_directories(syncTest
......@@ -182,6 +187,11 @@ target_include_directories(syncLogStoreTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEntryTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -256,6 +266,10 @@ target_link_libraries(syncLogStoreTest
sync
gtest_main
)
target_link_libraries(syncEntryTest
sync
gtest_main
)
enable_testing()
......@@ -263,3 +277,5 @@ add_test(
NAME sync_test
COMMAND syncTest
)
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
void test1() {
SSyncRaftEntry* pEntry = syncEntryBuild(10);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100;
pEntry->index = 200;
strcpy(pEntry->data, "test1");
syncEntryPrint(pEntry);
syncEntryDestory(pEntry);
}
void test2() {
SyncClientRequest* pSyncMsg = syncClientRequestBuild(10);
pSyncMsg->originalRpcType = 33;
pSyncMsg->seqNum = 11;
pSyncMsg->isWeak = 1;
strcpy(pSyncMsg->data, "test2");
SSyncRaftEntry* pEntry = syncEntryBuild2(pSyncMsg, 100, 200);
syncEntryPrint(pEntry);
syncClientRequestDestroy(pSyncMsg);
syncEntryDestory(pEntry);
}
void test3() {
SSyncRaftEntry* pEntry = syncEntryBuild(10);
assert(pEntry != NULL);
pEntry->msgType = 11;
pEntry->originalRpcType = 22;
pEntry->seqNum = 33;
pEntry->isWeak = true;
pEntry->term = 44;
pEntry->index = 55;
strcpy(pEntry->data, "test3");
syncEntryPrint(pEntry);
uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len);
assert(serialized != NULL);
SSyncRaftEntry* pEntry2 = syncEntryDeserialize(serialized, len);
syncEntryPrint(pEntry2);
free(serialized);
syncEntryDestory(pEntry2);
syncEntryDestory(pEntry);
}
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
test1();
test2();
test3();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册