提交 b05706f9 编写于 作者: M Minghao Li

sync refactor

上级 e12d24d2
...@@ -27,6 +27,12 @@ extern "C" { ...@@ -27,6 +27,12 @@ extern "C" {
#include "syncMessage.h" #include "syncMessage.h"
#include "taosdef.h" #include "taosdef.h"
typedef enum EntryType {
SYNC_RAFT_ENTRY_NULL = 0,
SYNC_RAFT_ENTRY_DATA = 1,
SYNC_RAFT_ENTRY_CONFIG = 2,
} EntryType;
typedef struct SSyncRaftEntry { typedef struct SSyncRaftEntry {
uint32_t bytes; uint32_t bytes;
uint32_t msgType; uint32_t msgType;
...@@ -35,12 +41,14 @@ typedef struct SSyncRaftEntry { ...@@ -35,12 +41,14 @@ typedef struct SSyncRaftEntry {
bool isWeak; bool isWeak;
SyncTerm term; SyncTerm term;
SyncIndex index; SyncIndex index;
EntryType entryType;
uint32_t dataLen; uint32_t dataLen;
char data[]; char data[];
} SSyncRaftEntry; } SSyncRaftEntry;
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4 SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType);
void syncEntryDestory(SSyncRaftEntry* pEntry); void syncEntryDestory(SSyncRaftEntry* pEntry);
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5 char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6 SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6
......
...@@ -28,6 +28,13 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { ...@@ -28,6 +28,13 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
// step 4. SyncClientRequest => SSyncRaftEntry, add term, index // step 4. SyncClientRequest => SSyncRaftEntry, add term, index
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild3(pMsg, term, index, SYNC_RAFT_ENTRY_DATA);
assert(pEntry != NULL);
return pEntry;
}
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType) {
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
assert(pEntry != NULL); assert(pEntry != NULL);
...@@ -37,6 +44,7 @@ SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncInde ...@@ -37,6 +44,7 @@ SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncInde
pEntry->isWeak = pMsg->isWeak; pEntry->isWeak = pMsg->isWeak;
pEntry->term = term; pEntry->term = term;
pEntry->index = index; pEntry->index = index;
pEntry->entryType = entryType;
pEntry->dataLen = pMsg->dataLen; pEntry->dataLen = pMsg->dataLen;
memcpy(pEntry->data, pMsg->data, pMsg->dataLen); memcpy(pEntry->data, pMsg->data, pMsg->dataLen);
...@@ -85,6 +93,7 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { ...@@ -85,6 +93,7 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index); snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index);
cJSON_AddStringToObject(pRoot, "index", u64buf); cJSON_AddStringToObject(pRoot, "index", u64buf);
cJSON_AddNumberToObject(pRoot, "entryType", pEntry->entryType);
cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen);
char* s; char* s;
......
...@@ -46,6 +46,20 @@ void test2() { ...@@ -46,6 +46,20 @@ void test2() {
} }
void test3() { void test3() {
SyncClientRequest* pSyncMsg = syncClientRequestBuild(10);
pSyncMsg->originalRpcType = 33;
pSyncMsg->seqNum = 11;
pSyncMsg->isWeak = 1;
strcpy(pSyncMsg->data, "test3");
SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NULL);
syncEntryPrint(pEntry);
syncClientRequestDestroy(pSyncMsg);
syncEntryDestory(pEntry);
}
void test4() {
SSyncRaftEntry* pEntry = syncEntryBuild(10); SSyncRaftEntry* pEntry = syncEntryBuild(10);
assert(pEntry != NULL); assert(pEntry != NULL);
pEntry->msgType = 11; pEntry->msgType = 11;
...@@ -54,7 +68,8 @@ void test3() { ...@@ -54,7 +68,8 @@ void test3() {
pEntry->isWeak = true; pEntry->isWeak = true;
pEntry->term = 44; pEntry->term = 44;
pEntry->index = 55; pEntry->index = 55;
strcpy(pEntry->data, "test3"); pEntry->entryType = SYNC_RAFT_ENTRY_CONFIG;
strcpy(pEntry->data, "test4");
syncEntryPrint(pEntry); syncEntryPrint(pEntry);
uint32_t len; uint32_t len;
...@@ -75,7 +90,8 @@ int main(int argc, char** argv) { ...@@ -75,7 +90,8 @@ int main(int argc, char** argv) {
test1(); test1();
test2(); test2();
test3(); test3();
test4();
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册