提交 c67a14ad 编写于 作者: M Minghao Li

add raft store

上级 8b71c4f5
...@@ -40,7 +40,7 @@ typedef struct SSyncIO { ...@@ -40,7 +40,7 @@ typedef struct SSyncIO {
SEpSet epSet; SEpSet epSet;
void *syncTimer; void *syncTimer;
void *syncTimerManager; void *syncTimerManager;
int32_t (*start)(struct SSyncIO *ths); int32_t (*start)(struct SSyncIO *ths);
int32_t (*stop)(struct SSyncIO *ths); int32_t (*stop)(struct SSyncIO *ths);
......
...@@ -28,7 +28,7 @@ extern "C" { ...@@ -28,7 +28,7 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
typedef struct SRaftId { typedef struct SRaftId {
SyncNodeId nodeId; SyncNodeId addr;
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
......
...@@ -23,20 +23,36 @@ extern "C" { ...@@ -23,20 +23,36 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "cJSON.h"
#include "syncInt.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
void testJson(); #define RAFT_STORE_BLOCK_SIZE 512
void testJson2(); #define RAFT_STORE_PATH_LEN 128
int32_t currentTerm(SyncTerm *pCurrentTerm); typedef struct SRaftStore {
SyncTerm currentTerm;
SRaftId voteFor;
FileFd fd;
char path[RAFT_STORE_PATH_LEN];
} SRaftStore;
int32_t persistCurrentTerm(SyncTerm currentTerm); SRaftStore *raftStoreOpen(const char *path);
int32_t voteFor(SRaftId *pRaftId); static int32_t raftStoreInit(SRaftStore *pRaftStore);
int32_t persistVoteFor(SRaftId *pRaftId); int32_t raftStoreClose(SRaftStore *pRaftStore);
int32_t raftStorePersist(SRaftStore *pRaftStore);
static bool raftStoreFileExist(char *path);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
void raftStorePrint(SRaftStore *pRaftStore);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -45,7 +45,7 @@ void *syncConsumer(void *param) { ...@@ -45,7 +45,7 @@ void *syncConsumer(void *param) {
SSyncIO *io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg * pRpcMsg, rpcMsg;
int type; int type;
qall = taosAllocateQall(); qall = taosAllocateQall();
......
...@@ -17,50 +17,116 @@ ...@@ -17,50 +17,116 @@
#include "cJSON.h" #include "cJSON.h"
#include "sync.h" #include "sync.h"
FileFd raftStoreFd; SRaftStore *raftStoreOpen(const char *path) {
int32_t ret;
void testJson() { SRaftStore *pRaftStore = malloc(sizeof(SRaftStore));
raftStoreFd = taosOpenFileReadWrite("raft.store"); if (pRaftStore == NULL) {
sError("raftStoreOpen malloc error");
return NULL;
}
memset(pRaftStore, 0, sizeof(*pRaftStore));
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
uint64_t currentTerm = 100; char storeBuf[RAFT_STORE_BLOCK_SIZE];
uint64_t voteFor = 200; memset(storeBuf, 0, sizeof(storeBuf));
cJSON *pRoot = cJSON_CreateObject(); if (!raftStoreFileExist(pRaftStore->path)) {
cJSON_AddNumberToObject(pRoot, "current_term", currentTerm); ret = raftStoreInit(pRaftStore);
cJSON_AddNumberToObject(pRoot, "vote_for", voteFor); assert(ret == 0);
}
char *serialized = cJSON_Print(pRoot); pRaftStore->fd = taosOpenFileReadWrite(pRaftStore->path);
int len = strlen(serialized); if (pRaftStore->fd < 0) {
printf("serialized: %s \n", serialized); return NULL;
}
int len = taosReadFile(pRaftStore->fd, storeBuf, sizeof(storeBuf));
assert(len == RAFT_STORE_BLOCK_SIZE);
taosWriteFile(raftStoreFd, serialized, len); ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
assert(ret == 0);
return pRaftStore;
} }
void testJson2() { static int32_t raftStoreInit(SRaftStore *pRaftStore) {
taosLSeekFile(raftStoreFd, 0, SEEK_SET); pRaftStore->fd = taosOpenFileCreateWrite(pRaftStore->path);
if (pRaftStore->fd < 0) {
return -1;
}
char buf[128]; pRaftStore->currentTerm = 0;
memset(buf, 0, sizeof(buf)); pRaftStore->voteFor.addr = 0;
taosReadFile(raftStoreFd, buf, sizeof(buf)); pRaftStore->voteFor.vgId = 0;
printf("read file: %s \n", buf);
cJSON *pRoot = cJSON_Parse(buf); int32_t ret = raftStorePersist(pRaftStore);
assert(ret == 0);
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); taosCloseFile(pRaftStore->fd);
uint64_t currentTerm = pCurrentTerm->valueint; return 0;
}
int32_t raftStoreClose(SRaftStore *pRaftStore) {
taosCloseFile(pRaftStore->fd);
free(pRaftStore);
return 0;
}
cJSON *pVoteFor = cJSON_GetObjectItem(pRoot, "vote_for"); int32_t raftStorePersist(SRaftStore *pRaftStore) {
uint64_t voteFor = pVoteFor->valueint; int32_t ret;
char storeBuf[RAFT_STORE_BLOCK_SIZE];
printf("read json: currentTerm:%lu, voteFor:%lu \n", currentTerm, voteFor); ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
assert(ret == 0);
taosCloseFile(raftStoreFd); taosLSeekFile(pRaftStore->fd, 0, SEEK_SET);
ret = taosWriteFile(pRaftStore->fd, storeBuf, sizeof(storeBuf));
assert(ret == RAFT_STORE_BLOCK_SIZE);
fsync(pRaftStore->fd);
return 0;
} }
int32_t currentTerm(SyncTerm *pCurrentTerm) { return 0; } static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; }
int32_t persistCurrentTerm(SyncTerm currentTerm) { return 0; } int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm);
cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
int32_t voteFor(SRaftId *pRaftId) { return 0; } char *serialized = cJSON_Print(pRoot);
int len2 = strlen(serialized);
assert(len2 < len);
memset(buf, 0, len);
snprintf(buf, len, "%s", serialized);
free(serialized);
cJSON_Delete(pRoot);
return 0;
}
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
cJSON *pRoot = cJSON_Parse(buf);
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
pRaftStore->currentTerm = pCurrentTerm->valueint;
int32_t persistVoteFor(SRaftId *pRaftId) { return 0; } cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
\ No newline at end of file pRaftStore->voteFor.addr = pVoteForAddr->valueint;
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
cJSON_Delete(pRoot);
return 0;
}
void raftStorePrint(SRaftStore *pRaftStore) {
char storeBuf[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
printf("%s\n", storeBuf);
}
...@@ -14,9 +14,23 @@ void *pingFunc(void *param) { ...@@ -14,9 +14,23 @@ void *pingFunc(void *param) {
} }
int main() { int main() {
tsAsyncLog = 0;
taosInitLog((char *)"syncTest.log", 100000, 10);
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
assert(pRaftStore != NULL);
raftStorePrint(pRaftStore);
pRaftStore->currentTerm = 100;
pRaftStore->voteFor.addr = 200;
pRaftStore->voteFor.vgId = 300;
raftStorePrint(pRaftStore);
raftStorePersist(pRaftStore);
testJson();
testJson2();
tsAsyncLog = 0; tsAsyncLog = 0;
taosInitLog((char *)"syncTest.log", 100000, 10); taosInitLog((char *)"syncTest.log", 100000, 10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册