未验证 提交 c5db2827 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #9962 from taosdata/feature/3.0_mhli

TD-12799 add traft demo code
add_subdirectory(rebalance_leader) add_subdirectory(single_node)
add_subdirectory(make_cluster)
all:
gcc node10000.c -I ../../include/ ../../.libs/libraft.a -o node10000 -luv -llz4 -lpthread -g
gcc node10001.c -I ../../include/ ../../.libs/libraft.a -o node10001 -luv -llz4 -lpthread -g
gcc node10002.c -I ../../include/ ../../.libs/libraft.a -o node10002 -luv -llz4 -lpthread -g
gcc node10000_restart.c -I ../../include/ ../../.libs/libraft.a -o node10000_restart -luv -llz4 -lpthread -g
gcc node10001_restart.c -I ../../include/ ../../.libs/libraft.a -o node10001_restart -luv -llz4 -lpthread -g
gcc node10002_restart.c -I ../../include/ ../../.libs/libraft.a -o node10002_restart -luv -llz4 -lpthread -g
clean:
rm -f node10000
rm -f node10001
rm -f node10002
rm -f node10000_restart
rm -f node10001_restart
rm -f node10002_restart
sh clear.sh
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
// printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10000, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// add vgroup, id = 100, only has 3 replica.
// array <peers, peersCount> gives the peer replica infomation.
char peers[MAX_PEERS_COUNT][ADDRESS_LEN];
memset(peers, 0, sizeof(peers));
snprintf(peers[0], ADDRESS_LEN, "%s", "127.0.0.1:10001");
snprintf(peers[1], ADDRESS_LEN, "%s", "127.0.0.1:10002");
uint32_t peersCount = 2;
r = addRaftVoter(&raftEnv, peers, peersCount, 100, pFsm);
assert(r == 0);
// for test: submit a value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
// printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10000, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// add vgroup, id = 100, only has 3 replica.
// here only add self.
// peer replica information will restore from wal.
r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm);
assert(r == 0);
// for test: submit a value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
// printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10001, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// add vgroup, id = 100, only has 3 replica.
// array <peers, peersCount> gives the peer replica infomation.
char peers[MAX_PEERS_COUNT][ADDRESS_LEN];
memset(peers, 0, sizeof(peers));
snprintf(peers[0], ADDRESS_LEN, "%s", "127.0.0.1:10000");
snprintf(peers[1], ADDRESS_LEN, "%s", "127.0.0.1:10002");
uint32_t peersCount = 2;
r = addRaftVoter(&raftEnv, peers, peersCount, 100, pFsm);
assert(r == 0);
// for test: submit a value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
// printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10001, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// add vgroup, id = 100, only has 3 replica.
// here only add self.
// peer replica information will restore from wal.
r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm);
assert(r == 0);
// for test: submit a value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
// printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10002, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// add vgroup, id = 100, only has 3 replica.
// array <peers, peersCount> gives the peer replica infomation.
char peers[MAX_PEERS_COUNT][ADDRESS_LEN];
memset(peers, 0, sizeof(peers));
snprintf(peers[0], ADDRESS_LEN, "%s", "127.0.0.1:10000");
snprintf(peers[1], ADDRESS_LEN, "%s", "127.0.0.1:10001");
uint32_t peersCount = 2;
r = addRaftVoter(&raftEnv, peers, peersCount, 100, pFsm);
assert(r == 0);
// for test: submit a value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
// printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10002, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// add vgroup, id = 100, only has 3 replica.
// here only add self.
// peer replica information will restore from wal.
r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm);
assert(r == 0);
// for test: submit a value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
all:
gcc node_follower10000.c -I ../../include/ ../../.libs/libraft.a -o node_follower10000 -luv -llz4 -lpthread -g
gcc node_follower10001.c -I ../../include/ ../../.libs/libraft.a -o node_follower10001 -luv -llz4 -lpthread -g
gcc node_leader10002.c -I ../../include/ ../../.libs/libraft.a -o node_leader10002 -luv -llz4 -lpthread -g
clean:
rm -f node_follower10000
rm -f node_follower10001
rm -f node_leader10002
sh clear.sh
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10000, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// add one replica
r = addRaftSpare(&raftEnv, 100, pFsm);
assert(r == 0);
// for test: submit value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10001, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// add one replica
r = addRaftSpare(&raftEnv, 100, pFsm);
assert(r == 0);
// for test: submit value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
void joinRaftPeerCb(struct raft_change *req, int status) {
struct raft *r = req->data;
if (status != 0) {
fprintf(stderr, "joinRaftPeerCb error: %s \n", raft_errmsg(r));
} else {
fprintf(stderr, "joinRaftPeerCb ok \n");
}
raft_free(req);
}
int main(int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 10002, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// add one replica
r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm);
assert(r == 0);
printRaftState(getRaft(&raftEnv, 100));
// wait for being leader
while (1) {
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
if (r->state == RAFT_LEADER) {
break;
}
}
// join peers
r = joinRaftPeer(&raftEnv, 100, "127.0.0.1", 10000, joinRaftPeerCb);
assert(r == 0);
// wait for join over
sleep(2);
r = joinRaftPeer(&raftEnv, 100, "127.0.0.1", 10001, joinRaftPeerCb);
assert(r == 0);
// for test: submit value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
add_executable(makeCluster "")
target_sources(makeCluster
PRIVATE
"raftMain.c"
"raftServer.c"
"config.c"
"console.c"
"simpleHash.c"
"util.c"
)
target_link_libraries(makeCluster PUBLIC traft lz4 uv_a)
#ifndef TRAFT_COMMON_H
#define TRAFT_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#define COMMAND_LEN 512
#define MAX_CMD_COUNT 10
#define TOKEN_LEN 128
#define MAX_PEERS_COUNT 19
#define HOST_LEN 64
#define ADDRESS_LEN (HOST_LEN * 2)
#define BASE_DIR_LEN 128
#ifdef __cplusplus
}
#endif
#endif
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
void addrToString(const char *host, uint16_t port, char *addr, int len) { snprintf(addr, len, "%s:%hu", host, port); }
void parseAddr(const char *addr, char *host, int len, uint16_t *port) {
char *tmp = (char *)malloc(strlen(addr) + 1);
strcpy(tmp, addr);
char *context;
char *separator = ":";
char *token = strtok_r(tmp, separator, &context);
if (token) {
snprintf(host, len, "%s", token);
}
token = strtok_r(NULL, separator, &context);
if (token) {
sscanf(token, "%hu", port);
}
free(tmp);
}
int parseConf(int argc, char **argv, RaftServerConfig *pConf) {
memset(pConf, 0, sizeof(*pConf));
int option_index, option_value;
option_index = 0;
static struct option long_options[] = {{"help", no_argument, NULL, 'h'},
{"addr", required_argument, NULL, 'a'},
{"dir", required_argument, NULL, 'd'},
{NULL, 0, NULL, 0}};
while ((option_value = getopt_long(argc, argv, "ha:d:", long_options, &option_index)) != -1) {
switch (option_value) {
case 'a': {
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
break;
}
case 'd': {
snprintf(pConf->baseDir, sizeof(pConf->baseDir), "%s", optarg);
break;
}
case 'h': {
return -2;
}
default: { return -2; }
}
}
return 0;
}
void printConf(RaftServerConfig *pConf) {
printf("\n---printConf: \n");
printf("me: [%s:%hu] \n", pConf->me.host, pConf->me.port);
printf("dataDir: [%s] \n\n", pConf->baseDir);
}
#ifndef TRAFT_CONFIG_H
#define TRAFT_CONFIG_H
#ifdef __cplusplus
extern "C" {
#endif
#include <getopt.h>
#include <stdint.h>
#include "common.h"
typedef struct {
char host[HOST_LEN];
uint16_t port;
} Addr;
typedef struct {
Addr me;
char baseDir[BASE_DIR_LEN];
} RaftServerConfig;
void addrToString(const char *host, uint16_t port, char *addr, int len);
void parseAddr(const char *addr, char *host, int len, uint16_t *port);
int parseConf(int argc, char **argv, RaftServerConfig *pConf);
void printConf(RaftServerConfig *pConf);
#ifdef __cplusplus
}
#endif
#endif
#include "console.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "raftServer.h"
#include "util.h"
void printHelp() {
printf("---------------------\n");
printf("help: \n\n");
printf("create a vgroup with 3 replicas: \n");
printf("create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 \n");
printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10002 \n");
printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10001 \n");
printf("\n");
printf("create a vgroup with only one replica: \n");
printf("create vnode voter vid 200 \n");
printf("\n");
printf("add vnode into vgroup: \n");
printf("create vnode spare vid 100 ---- run at 127.0.0.1:10003\n");
printf("join vnode vid 100 addr 127.0.0.1:10003 ---- run at leader of vgroup 100\n");
printf("\n");
printf("run \n");
printf("put 0 key value \n");
printf("get 0 key \n");
printf("---------------------\n");
}
void console(RaftServer *pRaftServer) {
while (1) {
int ret;
char cmdBuf[COMMAND_LEN];
memset(cmdBuf, 0, sizeof(cmdBuf));
printf("(console)> ");
char *retp = fgets(cmdBuf, COMMAND_LEN, stdin);
if (!retp) {
exit(-1);
}
int pos = strlen(cmdBuf);
if (cmdBuf[pos - 1] == '\n') {
cmdBuf[pos - 1] = '\0';
}
if (strncmp(cmdBuf, "", COMMAND_LEN) == 0) {
continue;
}
char cmds[MAX_CMD_COUNT][TOKEN_LEN];
memset(cmds, 0, sizeof(cmds));
int cmdCount;
cmdCount = splitString(cmdBuf, " ", cmds, MAX_CMD_COUNT);
if (strcmp(cmds[0], "create") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[3], "vid") == 0) {
uint16_t vid;
sscanf(cmds[4], "%hu", &vid);
if (strcmp(cmds[2], "voter") == 0) {
char peers[MAX_PEERS_COUNT][ADDRESS_LEN];
memset(peers, 0, sizeof(peers));
uint32_t peersCount = 0;
if (strcmp(cmds[5], "peers") == 0 && cmdCount > 6) {
// create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002
for (int i = 6; i < cmdCount; ++i) {
snprintf(peers[i - 6], ADDRESS_LEN, "%s", cmds[i]);
peersCount++;
}
} else {
// create vnode voter vid 200
}
ret = addRaftVoter(pRaftServer, peers, peersCount, vid);
if (ret == 0) {
printf("create vnode voter ok \n");
} else {
printf("create vnode voter error \n");
}
} else if (strcmp(cmds[2], "spare") == 0) {
ret = addRaftSpare(pRaftServer, vid);
if (ret == 0) {
printf("create vnode spare ok \n");
} else {
printf("create vnode spare error \n");
}
} else {
printHelp();
}
} else if (strcmp(cmds[0], "join") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[2], "vid") == 0 &&
strcmp(cmds[4], "addr") == 0 && cmdCount == 6) {
// join vnode vid 100 addr 127.0.0.1:10004
char * address = cmds[5];
char host[64];
uint16_t port;
parseAddr(address, host, sizeof(host), &port);
uint16_t vid;
sscanf(cmds[3], "%hu", &vid);
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
if (*pp == NULL) {
printf("vid:%hu not found \n", vid);
break;
}
RaftInstance *pRaftInstance = (*pp)->data;
uint64_t destRaftId = encodeRaftId(host, port, vid);
struct raft_change *req = raft_malloc(sizeof(*req));
RaftJoin * pRaftJoin = raft_malloc(sizeof(*pRaftJoin));
pRaftJoin->r = &pRaftInstance->raft;
pRaftJoin->joinId = destRaftId;
req->data = pRaftJoin;
ret = raft_add(&pRaftInstance->raft, req, destRaftId, address, raftChangeAddCb);
if (ret != 0) {
printf("raft_add error: %s \n", raft_errmsg(&pRaftInstance->raft));
}
} else if (strcmp(cmds[0], "dropnode") == 0) {
} else if (strcmp(cmds[0], "state") == 0) {
pRaftServer->raftInstances.print(&pRaftServer->raftInstances);
for (size_t i = 0; i < pRaftServer->raftInstances.length; ++i) {
HashNode *ptr = pRaftServer->raftInstances.table[i];
if (ptr != NULL) {
while (ptr != NULL) {
RaftInstance *pRaftInstance = ptr->data;
printf("instance vid:%hu raftId:%llu \n", ptr->vgroupId, pRaftInstance->raftId);
printRaftState(&pRaftInstance->raft);
printf("\n");
ptr = ptr->next;
}
printf("\n");
}
}
} else if (strcmp(cmds[0], "put") == 0 && cmdCount == 4) {
uint16_t vid;
sscanf(cmds[1], "%hu", &vid);
char * key = cmds[2];
char * value = cmds[3];
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
if (*pp == NULL) {
printf("vid:%hu not found \n", vid);
break;
}
RaftInstance *pRaftInstance = (*pp)->data;
char *raftValue = malloc(TOKEN_LEN * 2 + 3);
snprintf(raftValue, TOKEN_LEN * 2 + 3, "%s--%s", key, value);
putValue(&pRaftInstance->raft, raftValue);
free(raftValue);
} else if (strcmp(cmds[0], "run") == 0) {
pthread_t tidRaftServer;
pthread_create(&tidRaftServer, NULL, startServerFunc, pRaftServer);
} else if (strcmp(cmds[0], "get") == 0 && cmdCount == 3) {
uint16_t vid;
sscanf(cmds[1], "%hu", &vid);
char * key = cmds[2];
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
if (*pp == NULL) {
printf("vid:%hu not found \n", vid);
break;
}
RaftInstance * pRaftInstance = (*pp)->data;
SimpleHash * pKV = pRaftInstance->fsm.data;
SimpleHashNode **ppNode = pKV->find_cstr(pKV, key);
if (*ppNode == NULL) {
printf("key:%s not found \n", key);
} else {
printf("find key:%s value:%s \n", key, (char *)((*ppNode)->data));
}
} else if (strcmp(cmds[0], "transfer") == 0) {
} else if (strcmp(cmds[0], "state") == 0) {
} else if (strcmp(cmds[0], "snapshot") == 0) {
} else if (strcmp(cmds[0], "exit") == 0) {
exit(0);
} else if (strcmp(cmds[0], "quit") == 0) {
exit(0);
} else if (strcmp(cmds[0], "help") == 0) {
printHelp();
} else {
printf("unknown command: %s \n", cmdBuf);
printHelp();
}
/*
printf("cmdBuf: [%s] \n", cmdBuf);
printf("cmdCount : %d \n", cmdCount);
for (int i = 0; i < MAX_CMD_COUNT; ++i) {
printf("cmd%d : %s \n", i, cmds[i]);
}
*/
}
}
#ifndef TRAFT_CONSOLE_H
#define TRAFT_CONSOLE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <getopt.h>
#include <stdint.h>
#include "common.h"
#include "raftServer.h"
void console(RaftServer *pRaftServer);
#ifdef __cplusplus
}
#endif
#endif
#include <assert.h>
#include <getopt.h>
#include <pthread.h>
#include <raft.h>
#include <raft/uv.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "common.h"
#include "config.h"
#include "console.h"
#include "raftServer.h"
#include "simpleHash.h"
#include "util.h"
const char *exe_name;
void *startConsoleFunc(void *param) {
RaftServer *pRaftServer = (RaftServer *)param;
console(pRaftServer);
return NULL;
}
void usage() {
printf("\nusage: \n");
printf("%s --addr=127.0.0.1:10000 --dir=./data \n", exe_name);
printf("\n");
}
RaftServerConfig gConfig;
RaftServer gRaftServer;
int main(int argc, char **argv) {
srand(time(NULL));
int32_t ret;
exe_name = argv[0];
if (argc < 3) {
usage();
exit(-1);
}
ret = parseConf(argc, argv, &gConfig);
if (ret != 0) {
usage();
exit(-1);
}
printConf(&gConfig);
if (!dirOK(gConfig.baseDir)) {
ret = mkdir(gConfig.baseDir, 0775);
if (ret != 0) {
fprintf(stderr, "mkdir error, %s \n", gConfig.baseDir);
exit(-1);
}
}
ret = raftServerInit(&gRaftServer, &gConfig);
if (ret != 0) {
fprintf(stderr, "raftServerInit error \n");
exit(-1);
}
/*
pthread_t tidRaftServer;
pthread_create(&tidRaftServer, NULL, startServerFunc, &gRaftServer);
*/
pthread_t tidConsole;
pthread_create(&tidConsole, NULL, startConsoleFunc, &gRaftServer);
while (1) {
sleep(10);
}
return 0;
}
#include "raftServer.h"
#include <stdlib.h>
#include <unistd.h>
#include "common.h"
#include "simpleHash.h"
#include "util.h"
void *startServerFunc(void *param) {
RaftServer *pRaftServer = (RaftServer *)param;
int32_t r = raftServerStart(pRaftServer);
assert(r == 0);
return NULL;
}
void raftChangeAssignCb(struct raft_change *req, int status) {
struct raft *r = req->data;
if (status != 0) {
printf("raftChangeAssignCb error: %s \n", raft_errmsg(r));
} else {
printf("raftChangeAssignCb ok \n");
}
raft_free(req);
}
void raftChangeAddCb(struct raft_change *req, int status) {
RaftJoin *pRaftJoin = req->data;
if (status != 0) {
printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r));
} else {
struct raft_change *req2 = raft_malloc(sizeof(*req2));
req2->data = pRaftJoin->r;
int ret = raft_assign(pRaftJoin->r, req2, pRaftJoin->joinId, RAFT_VOTER, raftChangeAssignCb);
if (ret != 0) {
printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r));
}
}
raft_free(req->data);
raft_free(req);
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) {
// get fsm data
SimpleHash *sh = pFsm->data;
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: [%s] \n", msg);
char arr[2][TOKEN_LEN];
int r = splitString(msg, "--", arr, 2);
assert(r == 2);
// do the value on fsm
sh->insert_cstr(sh, arr[0], arr[1]);
raft_free(buf->base);
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
struct raft *r = req->data;
if (status != 0) {
printf("putValueCb error: %s \n", raft_errmsg(r));
} else {
printf("putValueCb: %s \n", "ok");
}
raft_free(req);
}
void putValue(struct raft *r, const char *value) {
struct raft_buffer buf;
buf.len = strlen(value) + 1;
buf.base = raft_malloc(buf.len);
snprintf(buf.base, buf.len, "%s", value);
struct raft_apply *req = raft_malloc(sizeof(*req));
req->data = r;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
const char *state2String(unsigned short state) {
if (state == RAFT_UNAVAILABLE) {
return "RAFT_UNAVAILABLE";
} else if (state == RAFT_FOLLOWER) {
return "RAFT_FOLLOWER";
} else if (state == RAFT_CANDIDATE) {
return "RAFT_CANDIDATE";
} else if (state == RAFT_LEADER) {
return "RAFT_LEADER";
}
return "UNKNOWN_RAFT_STATE";
}
void printRaftConfiguration(struct raft_configuration *c) {
printf("configuration: \n");
for (int i = 0; i < c->n; ++i) {
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
}
}
void printRaftState(struct raft *r) {
printf("----Raft State: -----------\n");
printf("mem_addr: %p \n", r);
printf("my_id: %llu \n", r->id);
printf("address: %s \n", r->address);
printf("current_term: %llu \n", r->current_term);
printf("voted_for: %llu \n", r->voted_for);
printf("role: %s \n", state2String(r->state));
printf("commit_index: %llu \n", r->commit_index);
printf("last_applied: %llu \n", r->last_applied);
printf("last_stored: %llu \n", r->last_stored);
printf("configuration_index: %llu \n", r->configuration_index);
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
printRaftConfiguration(&r->configuration);
printf("----------------------------\n");
}
int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid) {
int ret;
RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance));
assert(pRaftInstance != NULL);
// init raftId
pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
// init dir
snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir,
pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId);
if (!dirOK(pRaftInstance->dir)) {
ret = mkdir(pRaftInstance->dir, 0775);
if (ret != 0) {
fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir);
assert(0);
}
}
// init fsm
pRaftInstance->fsm.data = newSimpleHash(2);
pRaftInstance->fsm.apply = fsmApplyCb;
// init io
ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport);
if (ret != 0) {
fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// init raft
ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId,
pRaftServer->address);
if (ret != 0) {
fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// init raft_configuration
struct raft_configuration conf;
raft_configuration_init(&conf);
raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_VOTER);
for (int i = 0; i < peersCount; ++i) {
char * peerAddress = peers[i];
char host[64];
uint16_t port;
parseAddr(peerAddress, host, sizeof(host), &port);
uint64_t raftId = encodeRaftId(host, port, vid);
raft_configuration_add(&conf, raftId, peers[i], RAFT_VOTER);
}
raft_bootstrap(&pRaftInstance->raft, &conf);
// start raft
ret = raft_start(&pRaftInstance->raft);
if (ret != 0) {
fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// add raft instance into raft server
pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance);
return 0;
}
int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid) {
int ret;
RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance));
assert(pRaftInstance != NULL);
// init raftId
pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
// init dir
snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir,
pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId);
ret = mkdir(pRaftInstance->dir, 0775);
if (ret != 0) {
fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir);
assert(0);
}
// init fsm
pRaftInstance->fsm.data = newSimpleHash(2);
pRaftInstance->fsm.apply = fsmApplyCb;
// init io
ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport);
if (ret != 0) {
fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// init raft
ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId,
pRaftServer->address);
if (ret != 0) {
fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// init raft_configuration
struct raft_configuration conf;
raft_configuration_init(&conf);
raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_SPARE);
raft_bootstrap(&pRaftInstance->raft, &conf);
// start raft
ret = raft_start(&pRaftInstance->raft);
if (ret != 0) {
fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// add raft instance into raft server
pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance);
return 0;
}
int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf) {
int ret;
// init host, port, address, dir
snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host);
pRaftServer->port = pConf->me.port;
snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port);
snprintf(pRaftServer->baseDir, sizeof(pRaftServer->baseDir), "%s", pConf->baseDir);
// init loop
ret = uv_loop_init(&pRaftServer->loop);
if (ret != 0) {
fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret));
assert(0);
}
// init network
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
if (ret != 0) {
fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret);
assert(0);
}
// init raft instance container
initIdHash(&pRaftServer->raftInstances, 2);
return 0;
}
int32_t raftServerStart(RaftServer *pRaftServer) {
// start loop
uv_run(&pRaftServer->loop, UV_RUN_DEFAULT);
return 0;
}
void raftServerStop(RaftServer *pRaftServer) {}
#ifndef TDENGINE_RAFT_SERVER_H
#define TDENGINE_RAFT_SERVER_H
#ifdef __cplusplus
extern "C" {
#endif
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <string.h>
#include "common.h"
#include "config.h"
#include "raft.h"
#include "raft/uv.h"
#include "simpleHash.h"
typedef struct RaftJoin {
struct raft *r;
raft_id joinId;
} RaftJoin;
typedef struct {
raft_id raftId;
char dir[BASE_DIR_LEN * 2];
struct raft_fsm fsm;
struct raft_io io;
struct raft raft;
} RaftInstance;
typedef struct {
char host[HOST_LEN];
uint16_t port;
char address[ADDRESS_LEN]; /* Raft instance address */
char baseDir[BASE_DIR_LEN]; /* Raft instance address */
struct uv_loop_s loop; /* UV loop */
struct raft_uv_transport transport; /* UV I/O backend transport */
IdHash raftInstances; /* multi raft instances. traft use IdHash to manager multi vgroup inside, here we can use IdHash
too. */
} RaftServer;
void * startServerFunc(void *param);
int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid);
int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid);
int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf);
int32_t raftServerStart(RaftServer *pRaftServer);
void raftServerStop(RaftServer *pRaftServer);
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result);
void putValueCb(struct raft_apply *req, int status, void *result);
void putValue(struct raft *r, const char *value);
void raftChangeAddCb(struct raft_change *req, int status);
const char *state2String(unsigned short state);
void printRaftConfiguration(struct raft_configuration *c);
void printRaftState(struct raft *r);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RAFT_SERVER_H
#include "simpleHash.h"
uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed) {
// Similar to murmur hash
const uint32_t m = 0xc6a4a793;
const uint32_t r = 24;
const char* limit = data + n;
uint32_t h = seed ^ (n * m);
// Pick up four bytes at a time
while (data + 4 <= limit) {
// uint32_t w = DecodeFixed32(data);
uint32_t w;
memcpy(&w, data, 4);
data += 4;
h += w;
h *= m;
h ^= (h >> 16);
}
// Pick up remaining bytes
switch (limit - data) {
case 3:
h += (unsigned char)(data[2]) << 16;
do {
} while (0);
case 2:
h += (unsigned char)(data[1]) << 8;
do {
} while (0);
case 1:
h += (unsigned char)(data[0]);
h *= m;
h ^= (h >> r);
break;
}
return h;
}
int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data) {
return insertSimpleHash(ths, key, strlen(key) + 1, data, strlen(data) + 1);
}
int removeCStrSimpleHash(struct SimpleHash* ths, char* key) { return removeSimpleHash(ths, key, strlen(key) + 1); }
SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key) {
return findSimpleHash(ths, key, strlen(key) + 1);
}
int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen) {
SimpleHashNode** pp = ths->find(ths, key, keyLen);
if (*pp != NULL) {
fprintf(stderr, "insertSimpleHash, already has key \n");
return -1;
}
SimpleHashNode* node = malloc(sizeof(*node));
node->hashCode = ths->hashFunc(key, keyLen);
node->key = malloc(keyLen);
node->keyLen = keyLen;
memcpy(node->key, key, keyLen);
node->data = malloc(dataLen);
node->dataLen = dataLen;
memcpy(node->data, data, dataLen);
node->next = NULL;
// printf("insertSimpleHash: <%s, %ld, %s, %ld, %u> \n", node->key, node->keyLen, node->data, node->dataLen,
// node->hashCode);
size_t index = node->hashCode & (ths->length - 1);
SimpleHashNode* ptr = ths->table[index];
if (ptr != NULL) {
node->next = ptr;
ths->table[index] = node;
} else {
ths->table[index] = node;
}
ths->elems++;
if (ths->elems > 2 * ths->length) {
ths->resize(ths);
}
return 0;
}
int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) {
SimpleHashNode** pp = ths->find(ths, key, keyLen);
if (*pp == NULL) {
fprintf(stderr, "removeSimpleHash, key not exist \n");
return -1;
}
SimpleHashNode* del = *pp;
*pp = del->next;
free(del->key);
free(del->data);
free(del);
ths->elems--;
return 0;
}
SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) {
uint32_t hashCode = ths->hashFunc(key, keyLen);
// size_t index = hashCode % ths->length;
size_t index = hashCode & (ths->length - 1);
// printf("findSimpleHash: %s %ld %u \n", key, keyLen, hashCode);
SimpleHashNode** pp = &(ths->table[index]);
while (*pp != NULL && ((*pp)->hashCode != hashCode || memcmp(key, (*pp)->key, keyLen) != 0)) {
pp = &((*pp)->next);
}
return pp;
}
void printCStrSimpleHash(struct SimpleHash* ths) {
printf("\n--- printCStrSimpleHash: elems:%d length:%d \n", ths->elems, ths->length);
for (size_t i = 0; i < ths->length; ++i) {
SimpleHashNode* ptr = ths->table[i];
if (ptr != NULL) {
printf("%zu: ", i);
while (ptr != NULL) {
printf("<%u, %s, %ld, %s, %ld> ", ptr->hashCode, (char*)ptr->key, ptr->keyLen, (char*)ptr->data, ptr->dataLen);
ptr = ptr->next;
}
printf("\n");
}
}
printf("---------------\n");
}
void destroySimpleHash(struct SimpleHash* ths) {
for (size_t i = 0; i < ths->length; ++i) {
SimpleHashNode* ptr = ths->table[i];
while (ptr != NULL) {
SimpleHashNode* tmp = ptr;
ptr = ptr->next;
free(tmp->key);
free(tmp->data);
free(tmp);
}
}
ths->length = 0;
ths->elems = 0;
free(ths->table);
free(ths);
}
void resizeSimpleHash(struct SimpleHash* ths) {
uint32_t new_length = ths->length;
while (new_length < ths->elems) {
new_length *= 2;
}
printf("resizeSimpleHash: %p from %u to %u \n", ths, ths->length, new_length);
SimpleHashNode** new_table = malloc(new_length * sizeof(SimpleHashNode*));
memset(new_table, 0, new_length * sizeof(SimpleHashNode*));
uint32_t count = 0;
for (uint32_t i = 0; i < ths->length; i++) {
if (ths->table[i] == NULL) {
continue;
}
SimpleHashNode* it = ths->table[i];
while (it != NULL) {
SimpleHashNode* move_node = it;
it = it->next;
// move move_node
move_node->next = NULL;
size_t index = move_node->hashCode & (new_length - 1);
SimpleHashNode* ptr = new_table[index];
if (ptr != NULL) {
move_node->next = ptr;
new_table[index] = move_node;
} else {
new_table[index] = move_node;
}
count++;
}
}
assert(ths->elems == count);
free(ths->table);
ths->table = new_table;
ths->length = new_length;
}
uint32_t simpleHashFunc(const char* key, size_t keyLen) { return mySimpleHash(key, keyLen, 1); }
struct SimpleHash* newSimpleHash(size_t length) {
struct SimpleHash* ths = malloc(sizeof(*ths));
ths->length = length;
ths->elems = 0;
ths->table = malloc(length * sizeof(SimpleHashNode*));
memset(ths->table, 0, length * sizeof(SimpleHashNode*));
ths->insert = insertSimpleHash;
ths->remove = removeSimpleHash;
ths->find = findSimpleHash;
ths->insert_cstr = insertCStrSimpleHash;
ths->remove_cstr = removeCStrSimpleHash;
ths->find_cstr = findCStrSimpleHash;
ths->print_cstr = printCStrSimpleHash;
ths->destroy = destroySimpleHash;
ths->resize = resizeSimpleHash;
ths->hashFunc = simpleHashFunc;
}
#ifndef __SIMPLE_HASH_H__
#define __SIMPLE_HASH_H__
#include <assert.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed);
typedef struct SimpleHashNode {
uint32_t hashCode;
void* key;
size_t keyLen;
void* data;
size_t dataLen;
struct SimpleHashNode* next;
} SimpleHashNode;
typedef struct SimpleHash {
// public:
int (*insert)(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen);
int (*remove)(struct SimpleHash* ths, char* key, size_t keyLen);
SimpleHashNode** (*find)(struct SimpleHash* ths, char* key, size_t keyLen);
// wrapper
int (*insert_cstr)(struct SimpleHash* ths, char* key, char* data);
int (*remove_cstr)(struct SimpleHash* ths, char* key);
SimpleHashNode** (*find_cstr)(struct SimpleHash* ths, char* key);
void (*print_cstr)(struct SimpleHash* ths);
void (*destroy)(struct SimpleHash* ths);
uint32_t length;
uint32_t elems;
// private:
void (*resize)(struct SimpleHash* ths);
uint32_t (*hashFunc)(const char* key, size_t keyLen);
SimpleHashNode** table;
} SimpleHash;
int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data);
int removeCStrSimpleHash(struct SimpleHash* ths, char* key);
SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key);
void printCStrSimpleHash(struct SimpleHash* ths);
int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen);
int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen);
SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen);
void destroySimpleHash(struct SimpleHash* ths);
void resizeSimpleHash(struct SimpleHash* ths);
uint32_t simpleHashFunc(const char* key, size_t keyLen);
struct SimpleHash* newSimpleHash(size_t length);
#endif
#include "util.h"
#include <dirent.h>
#include <stdlib.h>
#include <string.h>
int dirOK(const char *path) {
DIR *dir = opendir(path);
if (dir != NULL) {
closedir(dir);
return 1;
} else {
return 0;
}
}
int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr) {
if (n_arr <= 0) {
return -1;
}
char *tmp = (char *)malloc(strlen(str) + 1);
strcpy(tmp, str);
char *context;
int n = 0;
char *token = strtok_r(tmp, separator, &context);
if (!token) {
goto ret;
}
strncpy(arr[n], token, TOKEN_LEN);
n++;
while (1) {
token = strtok_r(NULL, separator, &context);
if (!token || n >= n_arr) {
goto ret;
}
strncpy(arr[n], token, TOKEN_LEN);
n++;
}
ret:
free(tmp);
return n;
}
#ifndef TRAFT_UTIL_H
#define TRAFT_UTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "common.h"
int dirOK(const char *path);
int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr);
#ifdef __cplusplus
}
#endif
#endif
add_executable(rebalanceLeader "")
target_sources(rebalanceLeader
PRIVATE
"raftMain.c"
"raftServer.c"
)
target_link_libraries(rebalanceLeader PUBLIC traft lz4 uv_a)
#ifndef TDENGINE_COMMON_H
#define TDENGINE_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#define MAX_INSTANCE_NUM 100
#define MAX_PEERS 10
#define COMMAND_LEN 1024
#define TOKEN_LEN 128
#define DIR_LEN 256
#define HOST_LEN 64
#define ADDRESS_LEN (HOST_LEN + 16)
typedef struct {
char host[HOST_LEN];
uint32_t port;
} Addr;
typedef struct {
Addr me;
Addr peers[MAX_PEERS];
int peersCount;
char dir[DIR_LEN];
char dataDir[DIR_LEN + HOST_LEN * 2];
} SRaftServerConfig;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_COMMON_H
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <assert.h>
#include <getopt.h>
#include <time.h>
#include <stdlib.h>
#include <getopt.h>
#include <raft.h>
#include <raft/uv.h>
#include "raftServer.h"
#include "common.h"
const char *exe_name;
typedef struct LeaderState {
char address[48];
int leaderCount;
} LeaderState;
#define NODE_COUNT 3
LeaderState leaderStates[NODE_COUNT];
void printLeaderCount() {
for (int i = 0; i < NODE_COUNT; ++i) {
printf("%s: leaderCount:%d \n", leaderStates[i].address, leaderStates[i].leaderCount);
}
}
void updateLeaderStates(SRaftServer *pRaftServer) {
for (int i = 0; i < pRaftServer->instance[0].raft.configuration.n; ++i) {
snprintf(leaderStates[i].address, sizeof(leaderStates[i].address), "%s", pRaftServer->instance[0].raft.configuration.servers[i].address);
leaderStates[i].leaderCount = 0;
}
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
struct raft *r = &pRaftServer->instance[i].raft;
char leaderAddress[128];
memset(leaderAddress, 0, sizeof(leaderAddress));
if (r->state == RAFT_LEADER) {
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address);
} else if (r->state == RAFT_FOLLOWER) {
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address);
}
for (int j = 0; j < NODE_COUNT; j++) {
if (strcmp(leaderAddress, leaderStates[j].address) == 0) {
leaderStates[j].leaderCount++;
}
}
}
}
void raftTransferCb(struct raft_transfer *req) {
SRaftServer *pRaftServer = req->data;
raft_free(req);
//printf("raftTransferCb: \n");
updateLeaderStates(pRaftServer);
//printLeaderCount();
int myLeaderCount;
for (int i = 0; i < NODE_COUNT; ++i) {
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
myLeaderCount = leaderStates[i].leaderCount;
}
}
//printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
struct raft *r;
for (int j = 0; j < pRaftServer->instanceCount; ++j) {
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
r = &pRaftServer->instance[j].raft;
break;
}
}
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
transfer->data = pRaftServer;
uint64_t destRaftId;
int minIndex = -1;
int minLeaderCount = myLeaderCount;
for (int j = 0; j < NODE_COUNT; ++j) {
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) {
continue;
}
if (leaderStates[j].leaderCount <= minLeaderCount) {
minLeaderCount = leaderStates[j].leaderCount;
minIndex = j;
}
}
char myHost[48];
uint16_t myPort;
uint16_t myVid;
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
//printf("raftTransferCb transfer leader: vid[%u] choose: index:%d, leaderStates[%d].address:%s, leaderStates[%d].leaderCount:%d \n", minIndex, minIndex, leaderStates[minIndex].address, minIndex, leaderStates[minIndex].leaderCount);
char *destAddress = leaderStates[minIndex].address;
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
splitString(destAddress, ":", tokens, 2);
char *destHost = tokens[0];
uint16_t destPort = atoi(tokens[1]);
destRaftId = encodeRaftId(destHost, destPort, myVid);
printf("\nraftTransferCb transfer leader: vgroupId:%u from:%s:%u --> to:%s:%u ", myVid, myHost, myPort, destHost, destPort);
fflush(stdout);
raft_transfer(r, transfer, destRaftId, raftTransferCb);
}
}
void parseAddr(const char *addr, char *host, int len, uint32_t *port) {
char* tmp = (char*)malloc(strlen(addr) + 1);
strcpy(tmp, addr);
char* context;
char* separator = ":";
char* token = strtok_r(tmp, separator, &context);
if (token) {
snprintf(host, len, "%s", token);
}
token = strtok_r(NULL, separator, &context);
if (token) {
sscanf(token, "%u", port);
}
free(tmp);
}
// only parse 3 tokens
int parseCommand3(const char* str, char* token1, char* token2, char* token3, int len)
{
char* tmp = (char*)malloc(strlen(str) + 1);
strcpy(tmp, str);
char* context;
char* separator = " ";
int n = 0;
char* token = strtok_r(tmp, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token1, token, len);
n++;
}
token = strtok_r(NULL, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token2, token, len);
n++;
}
token = strtok_r(NULL, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token3, token, len);
n++;
}
ret:
return n;
free(tmp);
}
// only parse 4 tokens
int parseCommand4(const char* str, char* token1, char* token2, char* token3, char *token4, int len)
{
char* tmp = (char*)malloc(strlen(str) + 1);
strcpy(tmp, str);
char* context;
char* separator = " ";
int n = 0;
char* token = strtok_r(tmp, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token1, token, len);
n++;
}
token = strtok_r(NULL, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token2, token, len);
n++;
}
token = strtok_r(NULL, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token3, token, len);
n++;
}
token = strtok_r(NULL, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token4, token, len);
n++;
}
ret:
return n;
free(tmp);
}
void *startServerFunc(void *param) {
SRaftServer *pServer = (SRaftServer*)param;
int32_t r = raftServerStart(pServer);
assert(r == 0);
return NULL;
}
// Console ---------------------------------
const char* state2String(unsigned short state) {
if (state == RAFT_UNAVAILABLE) {
return "RAFT_UNAVAILABLE";
} else if (state == RAFT_FOLLOWER) {
return "RAFT_FOLLOWER";
} else if (state == RAFT_CANDIDATE) {
return "RAFT_CANDIDATE";
} else if (state == RAFT_LEADER) {
return "RAFT_LEADER";
}
return "UNKNOWN_RAFT_STATE";
}
void printRaftState2(struct raft *r) {
char leaderAddress[128];
memset(leaderAddress, 0, sizeof(leaderAddress));
if (r->state == RAFT_LEADER) {
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address);
} else if (r->state == RAFT_FOLLOWER) {
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address);
}
for (int i = 0; i < r->configuration.n; ++i) {
char tmpAddress[128];
snprintf(tmpAddress, sizeof(tmpAddress), "%s", r->configuration.servers[i].address);
uint64_t raftId = r->configuration.servers[i].id;
char host[128];
uint16_t port;
uint16_t vid;
decodeRaftId(raftId, host, 128, &port, &vid);
char buf[512];
memset(buf, 0, sizeof(buf));
if (strcmp(tmpAddress, leaderAddress) == 0) {
snprintf(buf, sizeof(buf), "<%s:%u-%u-LEADER>\t", host, port, vid);
} else {
snprintf(buf, sizeof(buf), "<%s:%u-%u-FOLLOWER>\t", host, port, vid);
}
printf("%s", buf);
}
printf("\n");
}
void printRaftConfiguration(struct raft_configuration *c) {
printf("configuration: \n");
for (int i = 0; i < c->n; ++i) {
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
}
}
void printRaftState(struct raft *r) {
printf("----Raft State: -----------\n");
printf("mem_addr: %p \n", r);
printf("my_id: %llu \n", r->id);
printf("address: %s \n", r->address);
printf("current_term: %llu \n", r->current_term);
printf("voted_for: %llu \n", r->voted_for);
printf("role: %s \n", state2String(r->state));
printf("commit_index: %llu \n", r->commit_index);
printf("last_applied: %llu \n", r->last_applied);
printf("last_stored: %llu \n", r->last_stored);
printf("configuration_index: %llu \n", r->configuration_index);
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
printRaftConfiguration(&r->configuration);
printf("----------------------------\n");
}
void putValueCb(struct raft_apply *req, int status, void *result) {
raft_free(req);
struct raft *r = req->data;
if (status != 0) {
printf("putValueCb: %s \n", raft_errmsg(r));
} else {
printf("putValueCb: %s \n", "ok");
}
}
void putValue(struct raft *r, const char *value) {
struct raft_buffer buf;
buf.len = TOKEN_LEN;;
buf.base = raft_malloc(buf.len);
snprintf(buf.base, buf.len, "%s", value);
struct raft_apply *req = raft_malloc(sizeof(struct raft_apply));
req->data = r;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char*)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
void getValue(const char *key) {
char *ptr = getKV(key);
if (ptr) {
printf("get value: [%s] \n", ptr);
} else {
printf("value not found for key: [%s] \n", key);
}
}
void console(SRaftServer *pRaftServer) {
while (1) {
char cmd_buf[COMMAND_LEN];
memset(cmd_buf, 0, sizeof(cmd_buf));
printf("(console)> ");
char *ret = fgets(cmd_buf, COMMAND_LEN, stdin);
if (!ret) {
exit(-1);
}
int pos = strlen(cmd_buf);
if(cmd_buf[pos - 1] == '\n') {
cmd_buf[pos - 1] = '\0';
}
if (strncmp(cmd_buf, "", COMMAND_LEN) == 0) {
continue;
}
char cmd[TOKEN_LEN];
memset(cmd, 0, sizeof(cmd));
char param1[TOKEN_LEN];
memset(param1, 0, sizeof(param1));
char param2[TOKEN_LEN];
memset(param2, 0, sizeof(param2));
char param3[TOKEN_LEN];
memset(param2, 0, sizeof(param2));
parseCommand4(cmd_buf, cmd, param1, param2, param3, TOKEN_LEN);
if (strcmp(cmd, "addnode") == 0) {
printf("not support \n");
/*
char host[HOST_LEN];
uint32_t port;
parseAddr(param1, host, HOST_LEN, &port);
uint64_t rid = raftId(host, port);
struct raft_change *req = raft_malloc(sizeof(*req));
int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL);
if (r != 0) {
printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft));
}
printf("add node: %lu %s \n", rid, param1);
struct raft_change *req2 = raft_malloc(sizeof(*req2));
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL);
if (r != 0) {
printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft));
}
*/
} else if (strcmp(cmd, "dropnode") == 0) {
printf("not support \n");
} else if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) {
exit(0);
} else if (strcmp(cmd, "rebalance") == 0 && strcmp(param1, "leader") == 0) {
/*
updateLeaderStates(pRaftServer);
int myLeaderCount;
for (int i = 0; i < NODE_COUNT; ++i) {
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
myLeaderCount = leaderStates[i].leaderCount;
}
}
while (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
struct raft *r;
for (int j = 0; j < pRaftServer->instanceCount; ++j) {
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
r = &pRaftServer->instance[j].raft;
}
}
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
transfer->data = pRaftServer;
uint64_t destRaftId;
int minIndex = -1;
int minLeaderCount = myLeaderCount;
for (int j = 0; j < NODE_COUNT; ++j) {
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue;
printf("-----leaderStates[%d].leaderCount:%d \n", j, leaderStates[j].leaderCount);
if (leaderStates[j].leaderCount <= minLeaderCount) {
minIndex = j;
printf("++++ assign minIndex : %d \n", minIndex);
}
}
printf("minIndex:%d minLeaderCount:%d \n", minIndex, minLeaderCount);
char myHost[48];
uint16_t myPort;
uint16_t myVid;
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
char *destAddress = leaderStates[minIndex].address;
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
splitString(destAddress, ":", tokens, 2);
char *destHost = tokens[0];
uint16_t destPort = atoi(tokens[1]);
destRaftId = encodeRaftId(destHost, destPort, myVid);
printf("destHost:%s destPort:%u myVid:%u", destHost, destPort, myVid);
raft_transfer(r, transfer, destRaftId, raftTransferCb);
sleep(1);
for (int i = 0; i < NODE_COUNT; ++i) {
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
myLeaderCount = leaderStates[i].leaderCount;
}
}
}
*/
int leaderCount = 0;
struct raft *firstR;
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
struct raft *r = &pRaftServer->instance[i].raft;
if (r->state == RAFT_LEADER) {
leaderCount++;
firstR = r;
}
}
if (leaderCount > pRaftServer->instanceCount / NODE_COUNT) {
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
transfer->data = pRaftServer;
raft_transfer(firstR, transfer, 0, raftTransferCb);
}
} else if (strcmp(cmd, "put") == 0) {
char buf[256];
uint16_t vid;
sscanf(param1, "%hu", &vid);
snprintf(buf, sizeof(buf), "%s--%s", param2, param3);
putValue(&pRaftServer->instance[vid].raft, buf);
} else if (strcmp(cmd, "get") == 0) {
getValue(param1);
} else if (strcmp(cmd, "transfer") == 0) {
uint16_t vid;
sscanf(param1, "%hu", &vid);
struct raft_transfer transfer;
raft_transfer(&pRaftServer->instance[vid].raft, &transfer, 0, NULL);
} else if (strcmp(cmd, "state") == 0) {
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
printf("instance %d: ", i);
printRaftState(&pRaftServer->instance[i].raft);
}
} else if (strcmp(cmd, "leader") == 0 && strcmp(param1, "state") == 0) {
updateLeaderStates(pRaftServer);
printf("\n--------------------------------------------\n");
printLeaderCount();
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
printRaftState2(&pRaftServer->instance[i].raft);
}
printf("--------------------------------------------\n");
} else if (strcmp(cmd, "snapshot") == 0) {
printf("not support \n");
} else if (strcmp(cmd, "help") == 0) {
printf("addnode \"127.0.0.1:8888\" \n");
printf("dropnode \"127.0.0.1:8888\" \n");
printf("put key value \n");
printf("get key \n");
printf("state \n");
} else {
printf("unknown command: [%s], type \"help\" to see help \n", cmd);
}
//printf("cmd_buf: [%s] \n", cmd_buf);
}
}
void *startConsoleFunc(void *param) {
SRaftServer *pServer = (SRaftServer*)param;
console(pServer);
return NULL;
}
// Config ---------------------------------
void usage() {
printf("\nusage: \n");
printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name);
printf("\n");
printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name);
printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name);
printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name);
printf("\n");
}
void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
memset(pConf, 0, sizeof(*pConf));
int option_index, option_value;
option_index = 0;
static struct option long_options[] = {
{"help", no_argument, NULL, 'h'},
{"peers", required_argument, NULL, 'p'},
{"me", required_argument, NULL, 'm'},
{"dir", required_argument, NULL, 'd'},
{NULL, 0, NULL, 0}
};
while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) {
switch (option_value) {
case 'm': {
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
break;
}
case 'p': {
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
int peerCount = splitString(optarg, ",", tokens, MAX_PEERS);
pConf->peersCount = peerCount;
for (int i = 0; i < peerCount; ++i) {
Addr *pAddr = &pConf->peers[i];
parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port);
}
break;
}
case 'd': {
snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg);
break;
}
case 'h': {
usage();
exit(-1);
}
default: {
usage();
exit(-1);
}
}
}
snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s_%u", pConf->dir, pConf->me.host, pConf->me.port);
}
void printConf(SRaftServerConfig *pConf) {
printf("\nconf: \n");
printf("me: %s:%u \n", pConf->me.host, pConf->me.port);
printf("peersCount: %d \n", pConf->peersCount);
for (int i = 0; i < pConf->peersCount; ++i) {
Addr *pAddr = &pConf->peers[i];
printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port);
}
printf("dataDir: %s \n\n", pConf->dataDir);
}
int main(int argc, char **argv) {
srand(time(NULL));
int32_t ret;
exe_name = argv[0];
if (argc < 3) {
usage();
exit(-1);
}
SRaftServerConfig conf;
parseConf(argc, argv, &conf);
printConf(&conf);
signal(SIGPIPE, SIG_IGN);
/*
char cmd_buf[COMMAND_LEN];
snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir);
system(cmd_buf);
*/
struct raft_fsm fsm;
initFsm(&fsm);
SRaftServer raftServer;
ret = raftServerInit(&raftServer, &conf, &fsm);
assert(ret == 0);
pthread_t tidRaftServer;
pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer);
pthread_t tidConsole;
pthread_create(&tidConsole, NULL, startConsoleFunc, &raftServer);
while (1) {
sleep(10);
}
return 0;
}
#include <unistd.h>
#include <stdlib.h>
#include "common.h"
#include "raftServer.h"
//char *keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);;
//char *values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
char keys[MAX_KV_LEN][MAX_RECORD_COUNT];
char values[MAX_KV_LEN][MAX_RECORD_COUNT];
int writeIndex = 0;
void initStore() {
}
void destroyStore() {
//free(keys);
//free(values);
}
void putKV(const char *key, const char *value) {
if (writeIndex < MAX_RECORD_COUNT) {
strncpy(keys[writeIndex], key, MAX_KV_LEN);
strncpy(values[writeIndex], value, MAX_KV_LEN);
writeIndex++;
}
}
char *getKV(const char *key) {
for (int i = 0; i < MAX_RECORD_COUNT; ++i) {
if (strcmp(keys[i], key) == 0) {
return values[i];
}
}
return NULL;
}
int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr)
{
if (n_arr <= 0) {
return -1;
}
char* tmp = (char*)malloc(strlen(str) + 1);
strcpy(tmp, str);
char* context;
int n = 0;
char* token = strtok_r(tmp, separator, &context);
if (!token) {
goto ret;
}
strncpy(arr[n], token, MAX_TOKEN_LEN);
n++;
while (1) {
token = strtok_r(NULL, separator, &context);
if (!token || n >= n_arr) {
goto ret;
}
strncpy(arr[n], token, MAX_TOKEN_LEN);
n++;
}
ret:
free(tmp);
return n;
}
/*
uint64_t raftId(const char *host, uint32_t port) {
uint32_t host_uint32 = (uint32_t)inet_addr(host);
assert(host_uint32 != (uint32_t)-1);
uint64_t code = ((uint64_t)host_uint32) << 32 | port;
return code;
}
*/
/*
uint64_t encodeRaftId(const char *host, uint16_t port, uint16_t vid) {
uint64_t raftId;
uint32_t host_uint32 = (uint32_t)inet_addr(host);
assert(host_uint32 != (uint32_t)-1);
raftId = (((uint64_t)host_uint32) << 32) | (((uint32_t)port) << 16) | vid;
return raftId;
}
void decodeRaftId(uint64_t raftId, char *host, int32_t len, uint16_t *port, uint16_t *vid) {
uint32_t host32 = (uint32_t)((raftId >> 32) & 0x00000000FFFFFFFF);
struct in_addr addr;
addr.s_addr = host32;
snprintf(host, len, "%s", inet_ntoa(addr));
*port = (uint16_t)((raftId & 0x00000000FFFF0000) >> 16);
*vid = (uint16_t)(raftId & 0x000000000000FFFF);
}
*/
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) {
int ret;
snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host);
pRaftServer->port = pConf->me.port;
snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port);
//strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir));
ret = uv_loop_init(&pRaftServer->loop);
if (ret != 0) {
fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret));
assert(0);
}
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
if (ret != 0) {
fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret);
assert(0);
}
uint16_t vid;
pRaftServer->instanceCount = 20;
for (int i = 0; i < pRaftServer->instanceCount; ++i)
{
//vid = 0;
vid = i;
pRaftServer->instance[vid].raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
snprintf(pRaftServer->instance[vid].dir, sizeof(pRaftServer->instance[vid].dir), "%s_%llu", pConf->dataDir, pRaftServer->instance[vid].raftId);
char cmd_buf[COMMAND_LEN];
snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", pRaftServer->instance[vid].dir);
system(cmd_buf);
sleep(1);
pRaftServer->instance[vid].fsm = pFsm;
ret = raft_uv_init(&pRaftServer->instance[vid].io, &pRaftServer->loop, pRaftServer->instance[vid].dir, &pRaftServer->transport);
if (ret != 0) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft));
assert(0);
}
ret = raft_init(&pRaftServer->instance[vid].raft, &pRaftServer->instance[vid].io, pRaftServer->instance[vid].fsm, pRaftServer->instance[vid].raftId, pRaftServer->address);
if (ret != 0) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft));
assert(0);
}
struct raft_configuration conf;
raft_configuration_init(&conf);
raft_configuration_add(&conf, pRaftServer->instance[vid].raftId, pRaftServer->address, RAFT_VOTER);
printf("add myself: %llu - %s \n", pRaftServer->instance[vid].raftId, pRaftServer->address);
for (int i = 0; i < pConf->peersCount; ++i) {
const Addr *pAddr = &pConf->peers[i];
raft_id rid = encodeRaftId(pAddr->host, pAddr->port, vid);
char addrBuf[ADDRESS_LEN];
snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port);
raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER);
printf("add peers: %llu - %s \n", rid, addrBuf);
}
raft_bootstrap(&pRaftServer->instance[vid].raft, &conf);
}
return 0;
}
int32_t raftServerStart(SRaftServer *pRaftServer) {
int ret;
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
ret = raft_start(&pRaftServer->instance[i].raft);
if (ret != 0) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[i].raft));
}
}
uv_run(&pRaftServer->loop, UV_RUN_DEFAULT);
}
void raftServerClose(SRaftServer *pRaftServer) {
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) {
char *msg = (char*)buf->base;
printf("fsm apply: %s \n", msg);
char arr[2][MAX_TOKEN_LEN];
splitString(msg, "--", arr, 2);
putKV(arr[0], arr[1]);
return 0;
}
int32_t initFsm(struct raft_fsm *fsm) {
initStore();
fsm->apply = fsmApplyCb;
return 0;
}
#ifndef TDENGINE_RAFT_SERVER_H
#define TDENGINE_RAFT_SERVER_H
#ifdef __cplusplus
extern "C" {
#endif
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <string.h>
#include "raft.h"
#include "raft/uv.h"
#include "common.h"
// simulate a db store, just for test
#define MAX_KV_LEN 20
#define MAX_RECORD_COUNT 16
//char *keys;
//char *values;
//int writeIndex;
void initStore();
void destroyStore();
void putKV(const char *key, const char *value);
char *getKV(const char *key);
typedef struct {
char dir[DIR_LEN + HOST_LEN * 4]; /* Data dir of UV I/O backend */
raft_id raftId; /* For vote */
struct raft_fsm *fsm; /* Sample application FSM */
struct raft raft; /* Raft instance */
struct raft_io io; /* UV I/O backend */
} SInstance;
typedef struct {
char host[HOST_LEN];
uint32_t port;
char address[ADDRESS_LEN]; /* Raft instance address */
struct uv_loop_s loop; /* UV loop */
struct raft_uv_transport transport; /* UV I/O backend transport */
SInstance instance[MAX_INSTANCE_NUM];
int32_t instanceCount;
} SRaftServer;
#define MAX_TOKEN_LEN 32
int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr);
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm);
int32_t raftServerStart(SRaftServer *pRaftServer);
void raftServerClose(SRaftServer *pRaftServer);
int initFsm(struct raft_fsm *fsm);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RAFT_SERVER_H
add_executable(singleNode "")
target_sources(singleNode
PRIVATE
"singleNode.c"
)
target_link_libraries(singleNode PUBLIC traft lz4 uv_a)
#!/bin/bash
rm -rf 127.0.0.1*
rm -rf ./data
all:
gcc singleNode.c -I ../../include/ ../../.libs/libraft.a -o singleNode -luv -llz4 -lpthread -g
clean:
rm -f singleNode
sh clear.sh
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
//#include <uv.h>
#include "raft.h"
SRaftEnv raftEnv;
typedef struct Tsdb {
uint64_t lastApplyIndex;
void *mem;
void *imm;
void *store;
} Tsdb;
void tsdbWrite(Tsdb *t, char *msg) {}
void *startFunc(void *param) {
SRaftEnv *pSRaftEnv = (SRaftEnv *)param;
int32_t r = raftEnvStart(pSRaftEnv);
assert(r == 0);
return NULL;
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) {
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: index:%llu value:%s \n", index, msg);
Tsdb *t = pFsm->data;
if (index > t->lastApplyIndex) {
// apply value into tsdb
tsdbWrite(t, msg);
// update lastApplyIndex
t->lastApplyIndex = index;
}
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
void *ptr = req->data;
if (status != 0) {
printf("putValueCb error \n");
} else {
printf("putValueCb ok \n");
}
free(ptr);
free(req);
}
void submitValue() {
// prepare value
struct raft_buffer buf;
buf.len = 32;
void *ptr = malloc(buf.len);
buf.base = ptr;
snprintf(buf.base, buf.len, "%ld", time(NULL));
// get raft
struct raft *r = getRaft(&raftEnv, 100);
assert(r != NULL);
// printRaftState(r);
// submit value
struct raft_apply *req = malloc(sizeof(*req));
req->data = ptr;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
int main(int argc, char **argv) {
// init raft env
int r = raftEnvInit(&raftEnv, "127.0.0.1", 38000, "./data");
assert(r == 0);
// start raft env
pthread_t tid;
pthread_create(&tid, NULL, startFunc, &raftEnv);
// wait for start, just for simple
while (raftEnv.isStart != 1) {
sleep(1);
}
// init fsm
struct raft_fsm *pFsm = malloc(sizeof(*pFsm));
pFsm->apply = fsmApplyCb;
Tsdb *tsdb = malloc(sizeof(*tsdb));
pFsm->data = tsdb;
// add vgroup, id = 100, only has one replica
r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm);
assert(r == 0);
// for test: submit a value every second
while (1) {
sleep(1);
submitValue();
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册