diff --git a/contrib/test/traft/CMakeLists.txt b/contrib/test/traft/CMakeLists.txt index d165c6d4dc7166ae0618e6a210da79fc2661e3b7..32e391646acee1ed74c29c164d43159f3d94f0e5 100644 --- a/contrib/test/traft/CMakeLists.txt +++ b/contrib/test/traft/CMakeLists.txt @@ -1,3 +1,2 @@ -add_subdirectory(rebalance_leader) -add_subdirectory(make_cluster) +add_subdirectory(single_node) diff --git a/contrib/test/traft/cluster/Makefile.2 b/contrib/test/traft/cluster/Makefile.2 new file mode 100644 index 0000000000000000000000000000000000000000..0a4f6ff325378c5290e79f709b5e0d8ccaf04f09 --- /dev/null +++ b/contrib/test/traft/cluster/Makefile.2 @@ -0,0 +1,16 @@ +all: + gcc node10000.c -I ../../include/ ../../.libs/libraft.a -o node10000 -luv -llz4 -lpthread -g + gcc node10001.c -I ../../include/ ../../.libs/libraft.a -o node10001 -luv -llz4 -lpthread -g + gcc node10002.c -I ../../include/ ../../.libs/libraft.a -o node10002 -luv -llz4 -lpthread -g + gcc node10000_restart.c -I ../../include/ ../../.libs/libraft.a -o node10000_restart -luv -llz4 -lpthread -g + gcc node10001_restart.c -I ../../include/ ../../.libs/libraft.a -o node10001_restart -luv -llz4 -lpthread -g + gcc node10002_restart.c -I ../../include/ ../../.libs/libraft.a -o node10002_restart -luv -llz4 -lpthread -g +clean: + rm -f node10000 + rm -f node10001 + rm -f node10002 + rm -f node10000_restart + rm -f node10001_restart + rm -f node10002_restart + sh clear.sh + diff --git a/contrib/test/traft/make_cluster/clear.sh b/contrib/test/traft/cluster/clear.sh similarity index 100% rename from contrib/test/traft/make_cluster/clear.sh rename to contrib/test/traft/cluster/clear.sh diff --git a/contrib/test/traft/cluster/node10000.c b/contrib/test/traft/cluster/node10000.c new file mode 100644 index 0000000000000000000000000000000000000000..96e0b067c2c915360a20f76c403246481c2b06de --- /dev/null +++ b/contrib/test/traft/cluster/node10000.c @@ -0,0 +1,117 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + // printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10000, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // add vgroup, id = 100, only has 3 replica. + // array gives the peer replica infomation. + char peers[MAX_PEERS_COUNT][ADDRESS_LEN]; + memset(peers, 0, sizeof(peers)); + snprintf(peers[0], ADDRESS_LEN, "%s", "127.0.0.1:10001"); + snprintf(peers[1], ADDRESS_LEN, "%s", "127.0.0.1:10002"); + uint32_t peersCount = 2; + r = addRaftVoter(&raftEnv, peers, peersCount, 100, pFsm); + assert(r == 0); + + // for test: submit a value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/cluster/node10000_restart.c b/contrib/test/traft/cluster/node10000_restart.c new file mode 100644 index 0000000000000000000000000000000000000000..c9538552ed10df55cc5a1fe28cac6e36953ff7ff --- /dev/null +++ b/contrib/test/traft/cluster/node10000_restart.c @@ -0,0 +1,113 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + // printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10000, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // add vgroup, id = 100, only has 3 replica. + // here only add self. + // peer replica information will restore from wal. + r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm); + assert(r == 0); + + // for test: submit a value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/cluster/node10001.c b/contrib/test/traft/cluster/node10001.c new file mode 100644 index 0000000000000000000000000000000000000000..08636637ac80510c3694a13c24a4e8ca4a3996ab --- /dev/null +++ b/contrib/test/traft/cluster/node10001.c @@ -0,0 +1,117 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + // printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10001, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // add vgroup, id = 100, only has 3 replica. + // array gives the peer replica infomation. + char peers[MAX_PEERS_COUNT][ADDRESS_LEN]; + memset(peers, 0, sizeof(peers)); + snprintf(peers[0], ADDRESS_LEN, "%s", "127.0.0.1:10000"); + snprintf(peers[1], ADDRESS_LEN, "%s", "127.0.0.1:10002"); + uint32_t peersCount = 2; + r = addRaftVoter(&raftEnv, peers, peersCount, 100, pFsm); + assert(r == 0); + + // for test: submit a value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/cluster/node10001_restart.c b/contrib/test/traft/cluster/node10001_restart.c new file mode 100644 index 0000000000000000000000000000000000000000..10d64d76efc7cbd148b31a4c4581f2db218b7c57 --- /dev/null +++ b/contrib/test/traft/cluster/node10001_restart.c @@ -0,0 +1,113 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + // printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10001, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // add vgroup, id = 100, only has 3 replica. + // here only add self. + // peer replica information will restore from wal. + r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm); + assert(r == 0); + + // for test: submit a value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/cluster/node10002.c b/contrib/test/traft/cluster/node10002.c new file mode 100644 index 0000000000000000000000000000000000000000..ebc3598075602e60743c10d68acdf3ff12598d81 --- /dev/null +++ b/contrib/test/traft/cluster/node10002.c @@ -0,0 +1,117 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + // printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10002, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // add vgroup, id = 100, only has 3 replica. + // array gives the peer replica infomation. + char peers[MAX_PEERS_COUNT][ADDRESS_LEN]; + memset(peers, 0, sizeof(peers)); + snprintf(peers[0], ADDRESS_LEN, "%s", "127.0.0.1:10000"); + snprintf(peers[1], ADDRESS_LEN, "%s", "127.0.0.1:10001"); + uint32_t peersCount = 2; + r = addRaftVoter(&raftEnv, peers, peersCount, 100, pFsm); + assert(r == 0); + + // for test: submit a value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/cluster/node10002_restart.c b/contrib/test/traft/cluster/node10002_restart.c new file mode 100644 index 0000000000000000000000000000000000000000..d772e97b2b33faf1f21261730f1d0c292abce3ec --- /dev/null +++ b/contrib/test/traft/cluster/node10002_restart.c @@ -0,0 +1,113 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + // printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10002, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // add vgroup, id = 100, only has 3 replica. + // here only add self. + // peer replica information will restore from wal. + r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm); + assert(r == 0); + + // for test: submit a value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/join_into_vgroup/Makefile.2 b/contrib/test/traft/join_into_vgroup/Makefile.2 new file mode 100644 index 0000000000000000000000000000000000000000..92f5b7269a873d05166324a842aba2987611071b --- /dev/null +++ b/contrib/test/traft/join_into_vgroup/Makefile.2 @@ -0,0 +1,10 @@ +all: + gcc node_follower10000.c -I ../../include/ ../../.libs/libraft.a -o node_follower10000 -luv -llz4 -lpthread -g + gcc node_follower10001.c -I ../../include/ ../../.libs/libraft.a -o node_follower10001 -luv -llz4 -lpthread -g + gcc node_leader10002.c -I ../../include/ ../../.libs/libraft.a -o node_leader10002 -luv -llz4 -lpthread -g +clean: + rm -f node_follower10000 + rm -f node_follower10001 + rm -f node_leader10002 + sh clear.sh + diff --git a/contrib/test/traft/rebalance_leader/clear.sh b/contrib/test/traft/join_into_vgroup/clear.sh similarity index 100% rename from contrib/test/traft/rebalance_leader/clear.sh rename to contrib/test/traft/join_into_vgroup/clear.sh diff --git a/contrib/test/traft/join_into_vgroup/node_follower10000.c b/contrib/test/traft/join_into_vgroup/node_follower10000.c new file mode 100644 index 0000000000000000000000000000000000000000..b684ef199c7aa15a39d0c376b51da3d7f7d25fc6 --- /dev/null +++ b/contrib/test/traft/join_into_vgroup/node_follower10000.c @@ -0,0 +1,113 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10000, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // add one replica + r = addRaftSpare(&raftEnv, 100, pFsm); + assert(r == 0); + + // for test: submit value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/join_into_vgroup/node_follower10001.c b/contrib/test/traft/join_into_vgroup/node_follower10001.c new file mode 100644 index 0000000000000000000000000000000000000000..27acd33917f56b4cb41c989c3db897e748f75a5c --- /dev/null +++ b/contrib/test/traft/join_into_vgroup/node_follower10001.c @@ -0,0 +1,113 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10001, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // add one replica + r = addRaftSpare(&raftEnv, 100, pFsm); + assert(r == 0); + + // for test: submit value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/join_into_vgroup/node_leader10002.c b/contrib/test/traft/join_into_vgroup/node_leader10002.c new file mode 100644 index 0000000000000000000000000000000000000000..b57af7777b26370663f45f82e55b2176c6a27b5a --- /dev/null +++ b/contrib/test/traft/join_into_vgroup/node_leader10002.c @@ -0,0 +1,144 @@ +#include +#include +#include +#include +#include +#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +void joinRaftPeerCb(struct raft_change *req, int status) { + struct raft *r = req->data; + if (status != 0) { + fprintf(stderr, "joinRaftPeerCb error: %s \n", raft_errmsg(r)); + } else { + fprintf(stderr, "joinRaftPeerCb ok \n"); + } + raft_free(req); +} + +int main(int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 10002, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // add one replica + r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm); + assert(r == 0); + + printRaftState(getRaft(&raftEnv, 100)); + + // wait for being leader + while (1) { + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + if (r->state == RAFT_LEADER) { + break; + } + } + + // join peers + r = joinRaftPeer(&raftEnv, 100, "127.0.0.1", 10000, joinRaftPeerCb); + assert(r == 0); + + // wait for join over + sleep(2); + + r = joinRaftPeer(&raftEnv, 100, "127.0.0.1", 10001, joinRaftPeerCb); + assert(r == 0); + + // for test: submit value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/contrib/test/traft/make_cluster/CMakeLists.txt b/contrib/test/traft/make_cluster/CMakeLists.txt deleted file mode 100644 index afd19d5435a9cfa580fa1188b90c053d250ad1f2..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/CMakeLists.txt +++ /dev/null @@ -1,11 +0,0 @@ -add_executable(makeCluster "") -target_sources(makeCluster - PRIVATE - "raftMain.c" - "raftServer.c" - "config.c" - "console.c" - "simpleHash.c" - "util.c" -) -target_link_libraries(makeCluster PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/make_cluster/common.h b/contrib/test/traft/make_cluster/common.h deleted file mode 100644 index df7422033acdc6ccd1f6c2c34c07781edb94c384..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/common.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef TRAFT_COMMON_H -#define TRAFT_COMMON_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include - -#define COMMAND_LEN 512 -#define MAX_CMD_COUNT 10 -#define TOKEN_LEN 128 -#define MAX_PEERS_COUNT 19 - -#define HOST_LEN 64 -#define ADDRESS_LEN (HOST_LEN * 2) -#define BASE_DIR_LEN 128 - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/contrib/test/traft/make_cluster/config.c b/contrib/test/traft/make_cluster/config.c deleted file mode 100644 index 3b96839fd962a98248080584927caed762e52007..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/config.c +++ /dev/null @@ -1,64 +0,0 @@ -#include "config.h" -#include -#include -#include - -void addrToString(const char *host, uint16_t port, char *addr, int len) { snprintf(addr, len, "%s:%hu", host, port); } - -void parseAddr(const char *addr, char *host, int len, uint16_t *port) { - char *tmp = (char *)malloc(strlen(addr) + 1); - strcpy(tmp, addr); - - char *context; - char *separator = ":"; - char *token = strtok_r(tmp, separator, &context); - if (token) { - snprintf(host, len, "%s", token); - } - - token = strtok_r(NULL, separator, &context); - if (token) { - sscanf(token, "%hu", port); - } - - free(tmp); -} - -int parseConf(int argc, char **argv, RaftServerConfig *pConf) { - memset(pConf, 0, sizeof(*pConf)); - - int option_index, option_value; - option_index = 0; - static struct option long_options[] = {{"help", no_argument, NULL, 'h'}, - {"addr", required_argument, NULL, 'a'}, - {"dir", required_argument, NULL, 'd'}, - {NULL, 0, NULL, 0}}; - - while ((option_value = getopt_long(argc, argv, "ha:d:", long_options, &option_index)) != -1) { - switch (option_value) { - case 'a': { - parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); - break; - } - - case 'd': { - snprintf(pConf->baseDir, sizeof(pConf->baseDir), "%s", optarg); - break; - } - - case 'h': { - return -2; - } - - default: { return -2; } - } - } - - return 0; -} - -void printConf(RaftServerConfig *pConf) { - printf("\n---printConf: \n"); - printf("me: [%s:%hu] \n", pConf->me.host, pConf->me.port); - printf("dataDir: [%s] \n\n", pConf->baseDir); -} diff --git a/contrib/test/traft/make_cluster/config.h b/contrib/test/traft/make_cluster/config.h deleted file mode 100644 index 13c43d0d285364065b558a52d2e6292ee21c5af2..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/config.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef TRAFT_CONFIG_H -#define TRAFT_CONFIG_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include "common.h" - -typedef struct { - char host[HOST_LEN]; - uint16_t port; -} Addr; - -typedef struct { - Addr me; - char baseDir[BASE_DIR_LEN]; -} RaftServerConfig; - -void addrToString(const char *host, uint16_t port, char *addr, int len); -void parseAddr(const char *addr, char *host, int len, uint16_t *port); -int parseConf(int argc, char **argv, RaftServerConfig *pConf); -void printConf(RaftServerConfig *pConf); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/contrib/test/traft/make_cluster/console.c b/contrib/test/traft/make_cluster/console.c deleted file mode 100644 index b00550c681f33ed6807ce9c97bc6c24b600c9526..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/console.c +++ /dev/null @@ -1,202 +0,0 @@ -#include "console.h" -#include -#include -#include -#include "raftServer.h" -#include "util.h" - -void printHelp() { - printf("---------------------\n"); - printf("help: \n\n"); - printf("create a vgroup with 3 replicas: \n"); - printf("create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 \n"); - printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10002 \n"); - printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10001 \n"); - printf("\n"); - printf("create a vgroup with only one replica: \n"); - printf("create vnode voter vid 200 \n"); - printf("\n"); - printf("add vnode into vgroup: \n"); - printf("create vnode spare vid 100 ---- run at 127.0.0.1:10003\n"); - printf("join vnode vid 100 addr 127.0.0.1:10003 ---- run at leader of vgroup 100\n"); - printf("\n"); - printf("run \n"); - printf("put 0 key value \n"); - printf("get 0 key \n"); - printf("---------------------\n"); -} - -void console(RaftServer *pRaftServer) { - while (1) { - int ret; - char cmdBuf[COMMAND_LEN]; - memset(cmdBuf, 0, sizeof(cmdBuf)); - printf("(console)> "); - char *retp = fgets(cmdBuf, COMMAND_LEN, stdin); - if (!retp) { - exit(-1); - } - - int pos = strlen(cmdBuf); - if (cmdBuf[pos - 1] == '\n') { - cmdBuf[pos - 1] = '\0'; - } - - if (strncmp(cmdBuf, "", COMMAND_LEN) == 0) { - continue; - } - - char cmds[MAX_CMD_COUNT][TOKEN_LEN]; - memset(cmds, 0, sizeof(cmds)); - - int cmdCount; - cmdCount = splitString(cmdBuf, " ", cmds, MAX_CMD_COUNT); - - if (strcmp(cmds[0], "create") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[3], "vid") == 0) { - uint16_t vid; - sscanf(cmds[4], "%hu", &vid); - - if (strcmp(cmds[2], "voter") == 0) { - char peers[MAX_PEERS_COUNT][ADDRESS_LEN]; - memset(peers, 0, sizeof(peers)); - uint32_t peersCount = 0; - - if (strcmp(cmds[5], "peers") == 0 && cmdCount > 6) { - // create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 - for (int i = 6; i < cmdCount; ++i) { - snprintf(peers[i - 6], ADDRESS_LEN, "%s", cmds[i]); - peersCount++; - } - } else { - // create vnode voter vid 200 - } - ret = addRaftVoter(pRaftServer, peers, peersCount, vid); - if (ret == 0) { - printf("create vnode voter ok \n"); - } else { - printf("create vnode voter error \n"); - } - } else if (strcmp(cmds[2], "spare") == 0) { - ret = addRaftSpare(pRaftServer, vid); - if (ret == 0) { - printf("create vnode spare ok \n"); - } else { - printf("create vnode spare error \n"); - } - } else { - printHelp(); - } - - } else if (strcmp(cmds[0], "join") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[2], "vid") == 0 && - strcmp(cmds[4], "addr") == 0 && cmdCount == 6) { - // join vnode vid 100 addr 127.0.0.1:10004 - - char * address = cmds[5]; - char host[64]; - uint16_t port; - parseAddr(address, host, sizeof(host), &port); - - uint16_t vid; - sscanf(cmds[3], "%hu", &vid); - - HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); - if (*pp == NULL) { - printf("vid:%hu not found \n", vid); - break; - } - RaftInstance *pRaftInstance = (*pp)->data; - - uint64_t destRaftId = encodeRaftId(host, port, vid); - - struct raft_change *req = raft_malloc(sizeof(*req)); - RaftJoin * pRaftJoin = raft_malloc(sizeof(*pRaftJoin)); - pRaftJoin->r = &pRaftInstance->raft; - pRaftJoin->joinId = destRaftId; - req->data = pRaftJoin; - ret = raft_add(&pRaftInstance->raft, req, destRaftId, address, raftChangeAddCb); - if (ret != 0) { - printf("raft_add error: %s \n", raft_errmsg(&pRaftInstance->raft)); - } - - } else if (strcmp(cmds[0], "dropnode") == 0) { - } else if (strcmp(cmds[0], "state") == 0) { - pRaftServer->raftInstances.print(&pRaftServer->raftInstances); - for (size_t i = 0; i < pRaftServer->raftInstances.length; ++i) { - HashNode *ptr = pRaftServer->raftInstances.table[i]; - if (ptr != NULL) { - while (ptr != NULL) { - RaftInstance *pRaftInstance = ptr->data; - printf("instance vid:%hu raftId:%llu \n", ptr->vgroupId, pRaftInstance->raftId); - printRaftState(&pRaftInstance->raft); - printf("\n"); - ptr = ptr->next; - } - printf("\n"); - } - } - - } else if (strcmp(cmds[0], "put") == 0 && cmdCount == 4) { - uint16_t vid; - sscanf(cmds[1], "%hu", &vid); - char * key = cmds[2]; - char * value = cmds[3]; - HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); - if (*pp == NULL) { - printf("vid:%hu not found \n", vid); - break; - } - RaftInstance *pRaftInstance = (*pp)->data; - - char *raftValue = malloc(TOKEN_LEN * 2 + 3); - snprintf(raftValue, TOKEN_LEN * 2 + 3, "%s--%s", key, value); - putValue(&pRaftInstance->raft, raftValue); - free(raftValue); - - } else if (strcmp(cmds[0], "run") == 0) { - pthread_t tidRaftServer; - pthread_create(&tidRaftServer, NULL, startServerFunc, pRaftServer); - - } else if (strcmp(cmds[0], "get") == 0 && cmdCount == 3) { - uint16_t vid; - sscanf(cmds[1], "%hu", &vid); - char * key = cmds[2]; - HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); - if (*pp == NULL) { - printf("vid:%hu not found \n", vid); - break; - } - RaftInstance * pRaftInstance = (*pp)->data; - SimpleHash * pKV = pRaftInstance->fsm.data; - SimpleHashNode **ppNode = pKV->find_cstr(pKV, key); - if (*ppNode == NULL) { - printf("key:%s not found \n", key); - } else { - printf("find key:%s value:%s \n", key, (char *)((*ppNode)->data)); - } - - } else if (strcmp(cmds[0], "transfer") == 0) { - } else if (strcmp(cmds[0], "state") == 0) { - } else if (strcmp(cmds[0], "snapshot") == 0) { - } else if (strcmp(cmds[0], "exit") == 0) { - exit(0); - - } else if (strcmp(cmds[0], "quit") == 0) { - exit(0); - - } else if (strcmp(cmds[0], "help") == 0) { - printHelp(); - - } else { - printf("unknown command: %s \n", cmdBuf); - printHelp(); - } - - /* - printf("cmdBuf: [%s] \n", cmdBuf); - printf("cmdCount : %d \n", cmdCount); - for (int i = 0; i < MAX_CMD_COUNT; ++i) { - printf("cmd%d : %s \n", i, cmds[i]); - } - */ - } -} diff --git a/contrib/test/traft/make_cluster/console.h b/contrib/test/traft/make_cluster/console.h deleted file mode 100644 index f9ed12baf5e9c674f906c78de38c39fd80d1e71f..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/console.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef TRAFT_CONSOLE_H -#define TRAFT_CONSOLE_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include "common.h" -#include "raftServer.h" - -void console(RaftServer *pRaftServer); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/contrib/test/traft/make_cluster/raftMain.c b/contrib/test/traft/make_cluster/raftMain.c deleted file mode 100644 index e25636de91ae5d634e3341b4b70a1868088a751d..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/raftMain.c +++ /dev/null @@ -1,81 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "common.h" -#include "config.h" -#include "console.h" -#include "raftServer.h" -#include "simpleHash.h" -#include "util.h" - -const char *exe_name; - -void *startConsoleFunc(void *param) { - RaftServer *pRaftServer = (RaftServer *)param; - console(pRaftServer); - return NULL; -} - -void usage() { - printf("\nusage: \n"); - printf("%s --addr=127.0.0.1:10000 --dir=./data \n", exe_name); - printf("\n"); -} - -RaftServerConfig gConfig; -RaftServer gRaftServer; - -int main(int argc, char **argv) { - srand(time(NULL)); - int32_t ret; - - exe_name = argv[0]; - if (argc < 3) { - usage(); - exit(-1); - } - - ret = parseConf(argc, argv, &gConfig); - if (ret != 0) { - usage(); - exit(-1); - } - printConf(&gConfig); - - if (!dirOK(gConfig.baseDir)) { - ret = mkdir(gConfig.baseDir, 0775); - if (ret != 0) { - fprintf(stderr, "mkdir error, %s \n", gConfig.baseDir); - exit(-1); - } - } - - ret = raftServerInit(&gRaftServer, &gConfig); - if (ret != 0) { - fprintf(stderr, "raftServerInit error \n"); - exit(-1); - } - - /* - pthread_t tidRaftServer; - pthread_create(&tidRaftServer, NULL, startServerFunc, &gRaftServer); - */ - - pthread_t tidConsole; - pthread_create(&tidConsole, NULL, startConsoleFunc, &gRaftServer); - - while (1) { - sleep(10); - } - - return 0; -} diff --git a/contrib/test/traft/make_cluster/raftServer.c b/contrib/test/traft/make_cluster/raftServer.c deleted file mode 100644 index bbf67b94207d73c0d7f56157072f9a506f1ec1bd..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/raftServer.c +++ /dev/null @@ -1,286 +0,0 @@ -#include "raftServer.h" -#include -#include -#include "common.h" -#include "simpleHash.h" -#include "util.h" - -void *startServerFunc(void *param) { - RaftServer *pRaftServer = (RaftServer *)param; - int32_t r = raftServerStart(pRaftServer); - assert(r == 0); - return NULL; -} - -void raftChangeAssignCb(struct raft_change *req, int status) { - struct raft *r = req->data; - if (status != 0) { - printf("raftChangeAssignCb error: %s \n", raft_errmsg(r)); - } else { - printf("raftChangeAssignCb ok \n"); - } - raft_free(req); -} - -void raftChangeAddCb(struct raft_change *req, int status) { - RaftJoin *pRaftJoin = req->data; - if (status != 0) { - printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r)); - } else { - struct raft_change *req2 = raft_malloc(sizeof(*req2)); - req2->data = pRaftJoin->r; - int ret = raft_assign(pRaftJoin->r, req2, pRaftJoin->joinId, RAFT_VOTER, raftChangeAssignCb); - if (ret != 0) { - printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r)); - } - } - raft_free(req->data); - raft_free(req); -} - -int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { - // get fsm data - SimpleHash *sh = pFsm->data; - - // get commit value - char *msg = (char *)buf->base; - printf("fsm apply: [%s] \n", msg); - char arr[2][TOKEN_LEN]; - int r = splitString(msg, "--", arr, 2); - assert(r == 2); - - // do the value on fsm - sh->insert_cstr(sh, arr[0], arr[1]); - - raft_free(buf->base); - return 0; -} - -void putValueCb(struct raft_apply *req, int status, void *result) { - struct raft *r = req->data; - if (status != 0) { - printf("putValueCb error: %s \n", raft_errmsg(r)); - } else { - printf("putValueCb: %s \n", "ok"); - } - raft_free(req); -} - -void putValue(struct raft *r, const char *value) { - struct raft_buffer buf; - - buf.len = strlen(value) + 1; - buf.base = raft_malloc(buf.len); - snprintf(buf.base, buf.len, "%s", value); - - struct raft_apply *req = raft_malloc(sizeof(*req)); - req->data = r; - int ret = raft_apply(r, req, &buf, 1, putValueCb); - if (ret == 0) { - printf("put %s \n", (char *)buf.base); - } else { - printf("put error: %s \n", raft_errmsg(r)); - } -} - -const char *state2String(unsigned short state) { - if (state == RAFT_UNAVAILABLE) { - return "RAFT_UNAVAILABLE"; - - } else if (state == RAFT_FOLLOWER) { - return "RAFT_FOLLOWER"; - - } else if (state == RAFT_CANDIDATE) { - return "RAFT_CANDIDATE"; - - } else if (state == RAFT_LEADER) { - return "RAFT_LEADER"; - } - return "UNKNOWN_RAFT_STATE"; -} - -void printRaftConfiguration(struct raft_configuration *c) { - printf("configuration: \n"); - for (int i = 0; i < c->n; ++i) { - printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address); - } -} - -void printRaftState(struct raft *r) { - printf("----Raft State: -----------\n"); - printf("mem_addr: %p \n", r); - printf("my_id: %llu \n", r->id); - printf("address: %s \n", r->address); - printf("current_term: %llu \n", r->current_term); - printf("voted_for: %llu \n", r->voted_for); - printf("role: %s \n", state2String(r->state)); - printf("commit_index: %llu \n", r->commit_index); - printf("last_applied: %llu \n", r->last_applied); - printf("last_stored: %llu \n", r->last_stored); - - printf("configuration_index: %llu \n", r->configuration_index); - printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); - printRaftConfiguration(&r->configuration); - - printf("----------------------------\n"); -} - -int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid) { - int ret; - - RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance)); - assert(pRaftInstance != NULL); - - // init raftId - pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); - - // init dir - snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir, - pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId); - - if (!dirOK(pRaftInstance->dir)) { - ret = mkdir(pRaftInstance->dir, 0775); - if (ret != 0) { - fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir); - assert(0); - } - } - - // init fsm - pRaftInstance->fsm.data = newSimpleHash(2); - pRaftInstance->fsm.apply = fsmApplyCb; - - // init io - ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport); - if (ret != 0) { - fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); - assert(0); - } - - // init raft - ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId, - pRaftServer->address); - if (ret != 0) { - fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); - assert(0); - } - - // init raft_configuration - struct raft_configuration conf; - raft_configuration_init(&conf); - raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_VOTER); - for (int i = 0; i < peersCount; ++i) { - char * peerAddress = peers[i]; - char host[64]; - uint16_t port; - parseAddr(peerAddress, host, sizeof(host), &port); - uint64_t raftId = encodeRaftId(host, port, vid); - raft_configuration_add(&conf, raftId, peers[i], RAFT_VOTER); - } - raft_bootstrap(&pRaftInstance->raft, &conf); - - // start raft - ret = raft_start(&pRaftInstance->raft); - if (ret != 0) { - fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft)); - assert(0); - } - - // add raft instance into raft server - pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance); - - return 0; -} - -int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid) { - int ret; - - RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance)); - assert(pRaftInstance != NULL); - - // init raftId - pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); - - // init dir - snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir, - pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId); - ret = mkdir(pRaftInstance->dir, 0775); - if (ret != 0) { - fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir); - assert(0); - } - - // init fsm - pRaftInstance->fsm.data = newSimpleHash(2); - pRaftInstance->fsm.apply = fsmApplyCb; - - // init io - ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport); - if (ret != 0) { - fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); - assert(0); - } - - // init raft - ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId, - pRaftServer->address); - if (ret != 0) { - fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); - assert(0); - } - - // init raft_configuration - struct raft_configuration conf; - raft_configuration_init(&conf); - raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_SPARE); - raft_bootstrap(&pRaftInstance->raft, &conf); - - // start raft - ret = raft_start(&pRaftInstance->raft); - if (ret != 0) { - fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft)); - assert(0); - } - - // add raft instance into raft server - pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance); - - return 0; -} - -int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf) { - int ret; - - // init host, port, address, dir - snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host); - pRaftServer->port = pConf->me.port; - snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port); - snprintf(pRaftServer->baseDir, sizeof(pRaftServer->baseDir), "%s", pConf->baseDir); - - // init loop - ret = uv_loop_init(&pRaftServer->loop); - if (ret != 0) { - fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret)); - assert(0); - } - - // init network - ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); - if (ret != 0) { - fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret); - assert(0); - } - - // init raft instance container - initIdHash(&pRaftServer->raftInstances, 2); - - return 0; -} - -int32_t raftServerStart(RaftServer *pRaftServer) { - // start loop - uv_run(&pRaftServer->loop, UV_RUN_DEFAULT); - return 0; -} - -void raftServerStop(RaftServer *pRaftServer) {} diff --git a/contrib/test/traft/make_cluster/raftServer.h b/contrib/test/traft/make_cluster/raftServer.h deleted file mode 100644 index b6dbddb2b7b6125db0eb8443aa1eb0be8954fe26..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/raftServer.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef TDENGINE_RAFT_SERVER_H -#define TDENGINE_RAFT_SERVER_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include -#include -#include "common.h" -#include "config.h" -#include "raft.h" -#include "raft/uv.h" -#include "simpleHash.h" - -typedef struct RaftJoin { - struct raft *r; - raft_id joinId; -} RaftJoin; - -typedef struct { - raft_id raftId; - char dir[BASE_DIR_LEN * 2]; - struct raft_fsm fsm; - struct raft_io io; - struct raft raft; -} RaftInstance; - -typedef struct { - char host[HOST_LEN]; - uint16_t port; - char address[ADDRESS_LEN]; /* Raft instance address */ - char baseDir[BASE_DIR_LEN]; /* Raft instance address */ - - struct uv_loop_s loop; /* UV loop */ - struct raft_uv_transport transport; /* UV I/O backend transport */ - - IdHash raftInstances; /* multi raft instances. traft use IdHash to manager multi vgroup inside, here we can use IdHash - too. */ -} RaftServer; - -void * startServerFunc(void *param); -int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid); -int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid); - -int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf); -int32_t raftServerStart(RaftServer *pRaftServer); -void raftServerStop(RaftServer *pRaftServer); - -int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result); -void putValueCb(struct raft_apply *req, int status, void *result); -void putValue(struct raft *r, const char *value); - -void raftChangeAddCb(struct raft_change *req, int status); - -const char *state2String(unsigned short state); -void printRaftConfiguration(struct raft_configuration *c); -void printRaftState(struct raft *r); - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_RAFT_SERVER_H diff --git a/contrib/test/traft/make_cluster/simpleHash.c b/contrib/test/traft/make_cluster/simpleHash.c deleted file mode 100644 index 6694843874682cece47231f46a0ea4c7c355924c..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/simpleHash.c +++ /dev/null @@ -1,218 +0,0 @@ -#include "simpleHash.h" - -uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed) { - // Similar to murmur hash - const uint32_t m = 0xc6a4a793; - const uint32_t r = 24; - const char* limit = data + n; - uint32_t h = seed ^ (n * m); - - // Pick up four bytes at a time - while (data + 4 <= limit) { - // uint32_t w = DecodeFixed32(data); - uint32_t w; - memcpy(&w, data, 4); - - data += 4; - h += w; - h *= m; - h ^= (h >> 16); - } - - // Pick up remaining bytes - switch (limit - data) { - case 3: - h += (unsigned char)(data[2]) << 16; - do { - } while (0); - case 2: - h += (unsigned char)(data[1]) << 8; - do { - } while (0); - case 1: - h += (unsigned char)(data[0]); - h *= m; - h ^= (h >> r); - break; - } - return h; -} - -int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data) { - return insertSimpleHash(ths, key, strlen(key) + 1, data, strlen(data) + 1); -} - -int removeCStrSimpleHash(struct SimpleHash* ths, char* key) { return removeSimpleHash(ths, key, strlen(key) + 1); } - -SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key) { - return findSimpleHash(ths, key, strlen(key) + 1); -} - -int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen) { - SimpleHashNode** pp = ths->find(ths, key, keyLen); - if (*pp != NULL) { - fprintf(stderr, "insertSimpleHash, already has key \n"); - return -1; - } - - SimpleHashNode* node = malloc(sizeof(*node)); - node->hashCode = ths->hashFunc(key, keyLen); - node->key = malloc(keyLen); - node->keyLen = keyLen; - memcpy(node->key, key, keyLen); - node->data = malloc(dataLen); - node->dataLen = dataLen; - memcpy(node->data, data, dataLen); - node->next = NULL; - - // printf("insertSimpleHash: <%s, %ld, %s, %ld, %u> \n", node->key, node->keyLen, node->data, node->dataLen, - // node->hashCode); - - size_t index = node->hashCode & (ths->length - 1); - - SimpleHashNode* ptr = ths->table[index]; - if (ptr != NULL) { - node->next = ptr; - ths->table[index] = node; - - } else { - ths->table[index] = node; - } - ths->elems++; - if (ths->elems > 2 * ths->length) { - ths->resize(ths); - } - - return 0; -} - -int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) { - SimpleHashNode** pp = ths->find(ths, key, keyLen); - if (*pp == NULL) { - fprintf(stderr, "removeSimpleHash, key not exist \n"); - return -1; - } - - SimpleHashNode* del = *pp; - *pp = del->next; - free(del->key); - free(del->data); - free(del); - ths->elems--; - - return 0; -} - -SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) { - uint32_t hashCode = ths->hashFunc(key, keyLen); - // size_t index = hashCode % ths->length; - size_t index = hashCode & (ths->length - 1); - - // printf("findSimpleHash: %s %ld %u \n", key, keyLen, hashCode); - - SimpleHashNode** pp = &(ths->table[index]); - while (*pp != NULL && ((*pp)->hashCode != hashCode || memcmp(key, (*pp)->key, keyLen) != 0)) { - pp = &((*pp)->next); - } - - return pp; -} - -void printCStrSimpleHash(struct SimpleHash* ths) { - printf("\n--- printCStrSimpleHash: elems:%d length:%d \n", ths->elems, ths->length); - for (size_t i = 0; i < ths->length; ++i) { - SimpleHashNode* ptr = ths->table[i]; - if (ptr != NULL) { - printf("%zu: ", i); - while (ptr != NULL) { - printf("<%u, %s, %ld, %s, %ld> ", ptr->hashCode, (char*)ptr->key, ptr->keyLen, (char*)ptr->data, ptr->dataLen); - ptr = ptr->next; - } - printf("\n"); - } - } - printf("---------------\n"); -} - -void destroySimpleHash(struct SimpleHash* ths) { - for (size_t i = 0; i < ths->length; ++i) { - SimpleHashNode* ptr = ths->table[i]; - while (ptr != NULL) { - SimpleHashNode* tmp = ptr; - ptr = ptr->next; - free(tmp->key); - free(tmp->data); - free(tmp); - } - } - - ths->length = 0; - ths->elems = 0; - free(ths->table); - free(ths); -} - -void resizeSimpleHash(struct SimpleHash* ths) { - uint32_t new_length = ths->length; - while (new_length < ths->elems) { - new_length *= 2; - } - - printf("resizeSimpleHash: %p from %u to %u \n", ths, ths->length, new_length); - - SimpleHashNode** new_table = malloc(new_length * sizeof(SimpleHashNode*)); - memset(new_table, 0, new_length * sizeof(SimpleHashNode*)); - - uint32_t count = 0; - for (uint32_t i = 0; i < ths->length; i++) { - if (ths->table[i] == NULL) { - continue; - } - - SimpleHashNode* it = ths->table[i]; - while (it != NULL) { - SimpleHashNode* move_node = it; - it = it->next; - - // move move_node - move_node->next = NULL; - size_t index = move_node->hashCode & (new_length - 1); - - SimpleHashNode* ptr = new_table[index]; - if (ptr != NULL) { - move_node->next = ptr; - new_table[index] = move_node; - } else { - new_table[index] = move_node; - } - count++; - } - } - - assert(ths->elems == count); - free(ths->table); - ths->table = new_table; - ths->length = new_length; -} - -uint32_t simpleHashFunc(const char* key, size_t keyLen) { return mySimpleHash(key, keyLen, 1); } - -struct SimpleHash* newSimpleHash(size_t length) { - struct SimpleHash* ths = malloc(sizeof(*ths)); - - ths->length = length; - ths->elems = 0; - ths->table = malloc(length * sizeof(SimpleHashNode*)); - memset(ths->table, 0, length * sizeof(SimpleHashNode*)); - - ths->insert = insertSimpleHash; - ths->remove = removeSimpleHash; - ths->find = findSimpleHash; - ths->insert_cstr = insertCStrSimpleHash; - ths->remove_cstr = removeCStrSimpleHash; - ths->find_cstr = findCStrSimpleHash; - ths->print_cstr = printCStrSimpleHash; - ths->destroy = destroySimpleHash; - ths->resize = resizeSimpleHash; - ths->hashFunc = simpleHashFunc; -} diff --git a/contrib/test/traft/make_cluster/simpleHash.h b/contrib/test/traft/make_cluster/simpleHash.h deleted file mode 100644 index c6fcd93888ed5b53b5c4e1abcde3c38a5fe642dc..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/simpleHash.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef __SIMPLE_HASH_H__ -#define __SIMPLE_HASH_H__ - -#include -#include -#include -#include -#include - -uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed); - -typedef struct SimpleHashNode { - uint32_t hashCode; - void* key; - size_t keyLen; - void* data; - size_t dataLen; - struct SimpleHashNode* next; -} SimpleHashNode; - -typedef struct SimpleHash { - // public: - - int (*insert)(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen); - int (*remove)(struct SimpleHash* ths, char* key, size_t keyLen); - SimpleHashNode** (*find)(struct SimpleHash* ths, char* key, size_t keyLen); - - // wrapper - int (*insert_cstr)(struct SimpleHash* ths, char* key, char* data); - int (*remove_cstr)(struct SimpleHash* ths, char* key); - SimpleHashNode** (*find_cstr)(struct SimpleHash* ths, char* key); - - void (*print_cstr)(struct SimpleHash* ths); - void (*destroy)(struct SimpleHash* ths); - - uint32_t length; - uint32_t elems; - - // private: - void (*resize)(struct SimpleHash* ths); - uint32_t (*hashFunc)(const char* key, size_t keyLen); - - SimpleHashNode** table; - -} SimpleHash; - -int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data); -int removeCStrSimpleHash(struct SimpleHash* ths, char* key); -SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key); -void printCStrSimpleHash(struct SimpleHash* ths); - -int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen); -int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen); -SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen); -void destroySimpleHash(struct SimpleHash* ths); -void resizeSimpleHash(struct SimpleHash* ths); -uint32_t simpleHashFunc(const char* key, size_t keyLen); - -struct SimpleHash* newSimpleHash(size_t length); - -#endif diff --git a/contrib/test/traft/make_cluster/util.c b/contrib/test/traft/make_cluster/util.c deleted file mode 100644 index ff704f366025b4d38343ec644cfea8f22b625414..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/util.c +++ /dev/null @@ -1,45 +0,0 @@ -#include "util.h" -#include -#include -#include - -int dirOK(const char *path) { - DIR *dir = opendir(path); - if (dir != NULL) { - closedir(dir); - return 1; - } else { - return 0; - } -} - -int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr) { - if (n_arr <= 0) { - return -1; - } - - char *tmp = (char *)malloc(strlen(str) + 1); - strcpy(tmp, str); - char *context; - int n = 0; - - char *token = strtok_r(tmp, separator, &context); - if (!token) { - goto ret; - } - strncpy(arr[n], token, TOKEN_LEN); - n++; - - while (1) { - token = strtok_r(NULL, separator, &context); - if (!token || n >= n_arr) { - goto ret; - } - strncpy(arr[n], token, TOKEN_LEN); - n++; - } - -ret: - free(tmp); - return n; -} diff --git a/contrib/test/traft/make_cluster/util.h b/contrib/test/traft/make_cluster/util.h deleted file mode 100644 index fb4ccb9c5c898688fde8335da5f3fccb39b08075..0000000000000000000000000000000000000000 --- a/contrib/test/traft/make_cluster/util.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef TRAFT_UTIL_H -#define TRAFT_UTIL_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "common.h" - -int dirOK(const char *path); -int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/contrib/test/traft/rebalance_leader/CMakeLists.txt b/contrib/test/traft/rebalance_leader/CMakeLists.txt deleted file mode 100644 index 92640bdd80f380913ad2c985b443b473e36dae60..0000000000000000000000000000000000000000 --- a/contrib/test/traft/rebalance_leader/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -add_executable(rebalanceLeader "") -target_sources(rebalanceLeader - PRIVATE - "raftMain.c" - "raftServer.c" -) -target_link_libraries(rebalanceLeader PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/rebalance_leader/common.h b/contrib/test/traft/rebalance_leader/common.h deleted file mode 100644 index 0229c29cf75e709d2441ed36c153ee41a29cfacc..0000000000000000000000000000000000000000 --- a/contrib/test/traft/rebalance_leader/common.h +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef TDENGINE_COMMON_H -#define TDENGINE_COMMON_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include - -#define MAX_INSTANCE_NUM 100 - -#define MAX_PEERS 10 -#define COMMAND_LEN 1024 -#define TOKEN_LEN 128 -#define DIR_LEN 256 -#define HOST_LEN 64 -#define ADDRESS_LEN (HOST_LEN + 16) - -typedef struct { - char host[HOST_LEN]; - uint32_t port; -} Addr; - -typedef struct { - Addr me; - Addr peers[MAX_PEERS]; - int peersCount; - char dir[DIR_LEN]; - char dataDir[DIR_LEN + HOST_LEN * 2]; -} SRaftServerConfig; - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_COMMON_H diff --git a/contrib/test/traft/rebalance_leader/raftMain.c b/contrib/test/traft/rebalance_leader/raftMain.c deleted file mode 100644 index 70dc191997ba208cf67fc44e116da1d8bf8c8681..0000000000000000000000000000000000000000 --- a/contrib/test/traft/rebalance_leader/raftMain.c +++ /dev/null @@ -1,678 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "raftServer.h" -#include "common.h" - -const char *exe_name; - -typedef struct LeaderState { - char address[48]; - int leaderCount; - -} LeaderState; - -#define NODE_COUNT 3 -LeaderState leaderStates[NODE_COUNT]; - -void printLeaderCount() { - for (int i = 0; i < NODE_COUNT; ++i) { - printf("%s: leaderCount:%d \n", leaderStates[i].address, leaderStates[i].leaderCount); - } -} - -void updateLeaderStates(SRaftServer *pRaftServer) { - for (int i = 0; i < pRaftServer->instance[0].raft.configuration.n; ++i) { - snprintf(leaderStates[i].address, sizeof(leaderStates[i].address), "%s", pRaftServer->instance[0].raft.configuration.servers[i].address); - leaderStates[i].leaderCount = 0; - } - - for (int i = 0; i < pRaftServer->instanceCount; ++i) { - struct raft *r = &pRaftServer->instance[i].raft; - - char leaderAddress[128]; - memset(leaderAddress, 0, sizeof(leaderAddress)); - - if (r->state == RAFT_LEADER) { - snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address); - } else if (r->state == RAFT_FOLLOWER) { - snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address); - } - - for (int j = 0; j < NODE_COUNT; j++) { - if (strcmp(leaderAddress, leaderStates[j].address) == 0) { - leaderStates[j].leaderCount++; - } - } - } -} - - -void raftTransferCb(struct raft_transfer *req) { - SRaftServer *pRaftServer = req->data; - raft_free(req); - - //printf("raftTransferCb: \n"); - updateLeaderStates(pRaftServer); - //printLeaderCount(); - - int myLeaderCount; - for (int i = 0; i < NODE_COUNT; ++i) { - if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { - myLeaderCount = leaderStates[i].leaderCount; - } - } - - //printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); - if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { - struct raft *r; - for (int j = 0; j < pRaftServer->instanceCount; ++j) { - if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { - r = &pRaftServer->instance[j].raft; - break; - } - } - - struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); - transfer->data = pRaftServer; - - uint64_t destRaftId; - int minIndex = -1; - int minLeaderCount = myLeaderCount; - for (int j = 0; j < NODE_COUNT; ++j) { - if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) { - continue; - } - - if (leaderStates[j].leaderCount <= minLeaderCount) { - minLeaderCount = leaderStates[j].leaderCount; - minIndex = j; - } - } - - - char myHost[48]; - uint16_t myPort; - uint16_t myVid; - decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); - - - //printf("raftTransferCb transfer leader: vid[%u] choose: index:%d, leaderStates[%d].address:%s, leaderStates[%d].leaderCount:%d \n", minIndex, minIndex, leaderStates[minIndex].address, minIndex, leaderStates[minIndex].leaderCount); - - char *destAddress = leaderStates[minIndex].address; - - char tokens[MAX_PEERS][MAX_TOKEN_LEN]; - splitString(destAddress, ":", tokens, 2); - char *destHost = tokens[0]; - uint16_t destPort = atoi(tokens[1]); - destRaftId = encodeRaftId(destHost, destPort, myVid); - - printf("\nraftTransferCb transfer leader: vgroupId:%u from:%s:%u --> to:%s:%u ", myVid, myHost, myPort, destHost, destPort); - fflush(stdout); - - raft_transfer(r, transfer, destRaftId, raftTransferCb); - } - -} - - -void parseAddr(const char *addr, char *host, int len, uint32_t *port) { - char* tmp = (char*)malloc(strlen(addr) + 1); - strcpy(tmp, addr); - - char* context; - char* separator = ":"; - char* token = strtok_r(tmp, separator, &context); - if (token) { - snprintf(host, len, "%s", token); - } - - token = strtok_r(NULL, separator, &context); - if (token) { - sscanf(token, "%u", port); - } - - free(tmp); -} - -// only parse 3 tokens -int parseCommand3(const char* str, char* token1, char* token2, char* token3, int len) -{ - char* tmp = (char*)malloc(strlen(str) + 1); - strcpy(tmp, str); - - char* context; - char* separator = " "; - int n = 0; - - char* token = strtok_r(tmp, separator, &context); - if (!token) { - goto ret; - } - if (strcmp(token, "") != 0) { - strncpy(token1, token, len); - n++; - } - - token = strtok_r(NULL, separator, &context); - if (!token) { - goto ret; - } - if (strcmp(token, "") != 0) { - strncpy(token2, token, len); - n++; - } - - token = strtok_r(NULL, separator, &context); - if (!token) { - goto ret; - } - if (strcmp(token, "") != 0) { - strncpy(token3, token, len); - n++; - } - -ret: - return n; - free(tmp); -} - -// only parse 4 tokens -int parseCommand4(const char* str, char* token1, char* token2, char* token3, char *token4, int len) -{ - char* tmp = (char*)malloc(strlen(str) + 1); - strcpy(tmp, str); - - char* context; - char* separator = " "; - int n = 0; - - char* token = strtok_r(tmp, separator, &context); - if (!token) { - goto ret; - } - if (strcmp(token, "") != 0) { - strncpy(token1, token, len); - n++; - } - - token = strtok_r(NULL, separator, &context); - if (!token) { - goto ret; - } - if (strcmp(token, "") != 0) { - strncpy(token2, token, len); - n++; - } - - token = strtok_r(NULL, separator, &context); - if (!token) { - goto ret; - } - if (strcmp(token, "") != 0) { - strncpy(token3, token, len); - n++; - } - - token = strtok_r(NULL, separator, &context); - if (!token) { - goto ret; - } - if (strcmp(token, "") != 0) { - strncpy(token4, token, len); - n++; - } - -ret: - return n; - free(tmp); -} - -void *startServerFunc(void *param) { - SRaftServer *pServer = (SRaftServer*)param; - int32_t r = raftServerStart(pServer); - assert(r == 0); - - return NULL; -} - -// Console --------------------------------- -const char* state2String(unsigned short state) { - if (state == RAFT_UNAVAILABLE) { - return "RAFT_UNAVAILABLE"; - - } else if (state == RAFT_FOLLOWER) { - return "RAFT_FOLLOWER"; - - } else if (state == RAFT_CANDIDATE) { - return "RAFT_CANDIDATE"; - - } else if (state == RAFT_LEADER) { - return "RAFT_LEADER"; - - } - return "UNKNOWN_RAFT_STATE"; -} - - -void printRaftState2(struct raft *r) { - char leaderAddress[128]; - memset(leaderAddress, 0, sizeof(leaderAddress)); - - if (r->state == RAFT_LEADER) { - snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address); - } else if (r->state == RAFT_FOLLOWER) { - snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address); - } - - for (int i = 0; i < r->configuration.n; ++i) { - char tmpAddress[128]; - snprintf(tmpAddress, sizeof(tmpAddress), "%s", r->configuration.servers[i].address); - - uint64_t raftId = r->configuration.servers[i].id; - char host[128]; - uint16_t port; - uint16_t vid; - decodeRaftId(raftId, host, 128, &port, &vid); - - char buf[512]; - memset(buf, 0, sizeof(buf)); - if (strcmp(tmpAddress, leaderAddress) == 0) { - snprintf(buf, sizeof(buf), "<%s:%u-%u-LEADER>\t", host, port, vid); - } else { - snprintf(buf, sizeof(buf), "<%s:%u-%u-FOLLOWER>\t", host, port, vid); - } - printf("%s", buf); - } - printf("\n"); -} - -void printRaftConfiguration(struct raft_configuration *c) { - printf("configuration: \n"); - for (int i = 0; i < c->n; ++i) { - printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address); - } -} - -void printRaftState(struct raft *r) { - printf("----Raft State: -----------\n"); - printf("mem_addr: %p \n", r); - printf("my_id: %llu \n", r->id); - printf("address: %s \n", r->address); - printf("current_term: %llu \n", r->current_term); - printf("voted_for: %llu \n", r->voted_for); - printf("role: %s \n", state2String(r->state)); - printf("commit_index: %llu \n", r->commit_index); - printf("last_applied: %llu \n", r->last_applied); - printf("last_stored: %llu \n", r->last_stored); - - printf("configuration_index: %llu \n", r->configuration_index); - printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); - printRaftConfiguration(&r->configuration); - - printf("----------------------------\n"); -} - -void putValueCb(struct raft_apply *req, int status, void *result) { - raft_free(req); - struct raft *r = req->data; - if (status != 0) { - printf("putValueCb: %s \n", raft_errmsg(r)); - } else { - printf("putValueCb: %s \n", "ok"); - } -} - -void putValue(struct raft *r, const char *value) { - struct raft_buffer buf; - - buf.len = TOKEN_LEN;; - buf.base = raft_malloc(buf.len); - snprintf(buf.base, buf.len, "%s", value); - - struct raft_apply *req = raft_malloc(sizeof(struct raft_apply)); - req->data = r; - int ret = raft_apply(r, req, &buf, 1, putValueCb); - if (ret == 0) { - printf("put %s \n", (char*)buf.base); - } else { - printf("put error: %s \n", raft_errmsg(r)); - } -} - -void getValue(const char *key) { - char *ptr = getKV(key); - if (ptr) { - printf("get value: [%s] \n", ptr); - } else { - printf("value not found for key: [%s] \n", key); - } -} - -void console(SRaftServer *pRaftServer) { - while (1) { - char cmd_buf[COMMAND_LEN]; - memset(cmd_buf, 0, sizeof(cmd_buf)); - printf("(console)> "); - char *ret = fgets(cmd_buf, COMMAND_LEN, stdin); - if (!ret) { - exit(-1); - } - - int pos = strlen(cmd_buf); - if(cmd_buf[pos - 1] == '\n') { - cmd_buf[pos - 1] = '\0'; - } - - if (strncmp(cmd_buf, "", COMMAND_LEN) == 0) { - continue; - } - - char cmd[TOKEN_LEN]; - memset(cmd, 0, sizeof(cmd)); - - char param1[TOKEN_LEN]; - memset(param1, 0, sizeof(param1)); - - char param2[TOKEN_LEN]; - memset(param2, 0, sizeof(param2)); - - char param3[TOKEN_LEN]; - memset(param2, 0, sizeof(param2)); - - parseCommand4(cmd_buf, cmd, param1, param2, param3, TOKEN_LEN); - if (strcmp(cmd, "addnode") == 0) { - printf("not support \n"); - - /* - char host[HOST_LEN]; - uint32_t port; - parseAddr(param1, host, HOST_LEN, &port); - uint64_t rid = raftId(host, port); - - struct raft_change *req = raft_malloc(sizeof(*req)); - int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL); - if (r != 0) { - printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft)); - } - printf("add node: %lu %s \n", rid, param1); - - struct raft_change *req2 = raft_malloc(sizeof(*req2)); - r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL); - if (r != 0) { - printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft)); - } - */ - - } else if (strcmp(cmd, "dropnode") == 0) { - printf("not support \n"); - - } else if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) { - exit(0); - - } else if (strcmp(cmd, "rebalance") == 0 && strcmp(param1, "leader") == 0) { - - /* - updateLeaderStates(pRaftServer); - - int myLeaderCount; - for (int i = 0; i < NODE_COUNT; ++i) { - if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { - myLeaderCount = leaderStates[i].leaderCount; - } - } - - while (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { - printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); - - struct raft *r; - for (int j = 0; j < pRaftServer->instanceCount; ++j) { - if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { - r = &pRaftServer->instance[j].raft; - } - } - - struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); - transfer->data = pRaftServer; - - uint64_t destRaftId; - int minIndex = -1; - int minLeaderCount = myLeaderCount; - for (int j = 0; j < NODE_COUNT; ++j) { - if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue; - - printf("-----leaderStates[%d].leaderCount:%d \n", j, leaderStates[j].leaderCount); - if (leaderStates[j].leaderCount <= minLeaderCount) { - minIndex = j; - printf("++++ assign minIndex : %d \n", minIndex); - } - } - - printf("minIndex:%d minLeaderCount:%d \n", minIndex, minLeaderCount); - - char myHost[48]; - uint16_t myPort; - uint16_t myVid; - decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); - - char *destAddress = leaderStates[minIndex].address; - - char tokens[MAX_PEERS][MAX_TOKEN_LEN]; - splitString(destAddress, ":", tokens, 2); - char *destHost = tokens[0]; - uint16_t destPort = atoi(tokens[1]); - destRaftId = encodeRaftId(destHost, destPort, myVid); - - printf("destHost:%s destPort:%u myVid:%u", destHost, destPort, myVid); - raft_transfer(r, transfer, destRaftId, raftTransferCb); - sleep(1); - - for (int i = 0; i < NODE_COUNT; ++i) { - if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { - myLeaderCount = leaderStates[i].leaderCount; - } - } - } - */ - - - int leaderCount = 0; - - struct raft *firstR; - for (int i = 0; i < pRaftServer->instanceCount; ++i) { - struct raft *r = &pRaftServer->instance[i].raft; - if (r->state == RAFT_LEADER) { - leaderCount++; - firstR = r; - } - } - - if (leaderCount > pRaftServer->instanceCount / NODE_COUNT) { - struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); - transfer->data = pRaftServer; - raft_transfer(firstR, transfer, 0, raftTransferCb); - } - - - } else if (strcmp(cmd, "put") == 0) { - char buf[256]; - uint16_t vid; - sscanf(param1, "%hu", &vid); - snprintf(buf, sizeof(buf), "%s--%s", param2, param3); - putValue(&pRaftServer->instance[vid].raft, buf); - - } else if (strcmp(cmd, "get") == 0) { - getValue(param1); - - } else if (strcmp(cmd, "transfer") == 0) { - uint16_t vid; - sscanf(param1, "%hu", &vid); - - struct raft_transfer transfer; - raft_transfer(&pRaftServer->instance[vid].raft, &transfer, 0, NULL); - - - } else if (strcmp(cmd, "state") == 0) { - for (int i = 0; i < pRaftServer->instanceCount; ++i) { - printf("instance %d: ", i); - printRaftState(&pRaftServer->instance[i].raft); - } - - } else if (strcmp(cmd, "leader") == 0 && strcmp(param1, "state") == 0) { - updateLeaderStates(pRaftServer); - printf("\n--------------------------------------------\n"); - printLeaderCount(); - for (int i = 0; i < pRaftServer->instanceCount; ++i) { - printRaftState2(&pRaftServer->instance[i].raft); - } - printf("--------------------------------------------\n"); - - } else if (strcmp(cmd, "snapshot") == 0) { - printf("not support \n"); - - } else if (strcmp(cmd, "help") == 0) { - printf("addnode \"127.0.0.1:8888\" \n"); - printf("dropnode \"127.0.0.1:8888\" \n"); - printf("put key value \n"); - printf("get key \n"); - printf("state \n"); - - } else { - printf("unknown command: [%s], type \"help\" to see help \n", cmd); - } - - //printf("cmd_buf: [%s] \n", cmd_buf); - } -} - -void *startConsoleFunc(void *param) { - SRaftServer *pServer = (SRaftServer*)param; - console(pServer); - return NULL; -} - -// Config --------------------------------- -void usage() { - printf("\nusage: \n"); - printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name); - printf("\n"); - printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name); - printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name); - printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name); - printf("\n"); -} - -void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { - memset(pConf, 0, sizeof(*pConf)); - - int option_index, option_value; - option_index = 0; - static struct option long_options[] = { - {"help", no_argument, NULL, 'h'}, - {"peers", required_argument, NULL, 'p'}, - {"me", required_argument, NULL, 'm'}, - {"dir", required_argument, NULL, 'd'}, - {NULL, 0, NULL, 0} - }; - - while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) { - switch (option_value) { - case 'm': { - parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); - break; - } - - case 'p': { - char tokens[MAX_PEERS][MAX_TOKEN_LEN]; - int peerCount = splitString(optarg, ",", tokens, MAX_PEERS); - pConf->peersCount = peerCount; - for (int i = 0; i < peerCount; ++i) { - Addr *pAddr = &pConf->peers[i]; - parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port); - } - break; - } - - - case 'd': { - snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg); - break; - } - - case 'h': { - usage(); - exit(-1); - } - - default: { - usage(); - exit(-1); - } - } - } - snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s_%u", pConf->dir, pConf->me.host, pConf->me.port); -} - -void printConf(SRaftServerConfig *pConf) { - printf("\nconf: \n"); - printf("me: %s:%u \n", pConf->me.host, pConf->me.port); - printf("peersCount: %d \n", pConf->peersCount); - for (int i = 0; i < pConf->peersCount; ++i) { - Addr *pAddr = &pConf->peers[i]; - printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port); - } - printf("dataDir: %s \n\n", pConf->dataDir); - -} - - -int main(int argc, char **argv) { - srand(time(NULL)); - int32_t ret; - - exe_name = argv[0]; - if (argc < 3) { - usage(); - exit(-1); - } - - SRaftServerConfig conf; - parseConf(argc, argv, &conf); - printConf(&conf); - - signal(SIGPIPE, SIG_IGN); - - /* - char cmd_buf[COMMAND_LEN]; - snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir); - system(cmd_buf); - */ - - - struct raft_fsm fsm; - initFsm(&fsm); - - SRaftServer raftServer; - ret = raftServerInit(&raftServer, &conf, &fsm); - assert(ret == 0); - - pthread_t tidRaftServer; - pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer); - - pthread_t tidConsole; - pthread_create(&tidConsole, NULL, startConsoleFunc, &raftServer); - - while (1) { - sleep(10); - } - - return 0; -} diff --git a/contrib/test/traft/rebalance_leader/raftServer.c b/contrib/test/traft/rebalance_leader/raftServer.c deleted file mode 100644 index 165d3c9023fbe077c04d9bfa4f37784b07469675..0000000000000000000000000000000000000000 --- a/contrib/test/traft/rebalance_leader/raftServer.c +++ /dev/null @@ -1,224 +0,0 @@ -#include -#include -#include "common.h" -#include "raftServer.h" - -//char *keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);; -//char *values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); - - -char keys[MAX_KV_LEN][MAX_RECORD_COUNT]; -char values[MAX_KV_LEN][MAX_RECORD_COUNT]; -int writeIndex = 0; - -void initStore() { -} - -void destroyStore() { - //free(keys); - //free(values); -} - -void putKV(const char *key, const char *value) { - if (writeIndex < MAX_RECORD_COUNT) { - strncpy(keys[writeIndex], key, MAX_KV_LEN); - strncpy(values[writeIndex], value, MAX_KV_LEN); - writeIndex++; - } -} - -char *getKV(const char *key) { - for (int i = 0; i < MAX_RECORD_COUNT; ++i) { - if (strcmp(keys[i], key) == 0) { - return values[i]; - } - } - return NULL; -} - - -int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr) -{ - if (n_arr <= 0) { - return -1; - } - - char* tmp = (char*)malloc(strlen(str) + 1); - strcpy(tmp, str); - char* context; - int n = 0; - - char* token = strtok_r(tmp, separator, &context); - if (!token) { - goto ret; - } - strncpy(arr[n], token, MAX_TOKEN_LEN); - n++; - - while (1) { - token = strtok_r(NULL, separator, &context); - if (!token || n >= n_arr) { - goto ret; - } - strncpy(arr[n], token, MAX_TOKEN_LEN); - n++; - } - -ret: - free(tmp); - return n; -} - -/* -uint64_t raftId(const char *host, uint32_t port) { - uint32_t host_uint32 = (uint32_t)inet_addr(host); - assert(host_uint32 != (uint32_t)-1); - uint64_t code = ((uint64_t)host_uint32) << 32 | port; - return code; -} -*/ - - -/* -uint64_t encodeRaftId(const char *host, uint16_t port, uint16_t vid) { - uint64_t raftId; - uint32_t host_uint32 = (uint32_t)inet_addr(host); - assert(host_uint32 != (uint32_t)-1); - - raftId = (((uint64_t)host_uint32) << 32) | (((uint32_t)port) << 16) | vid; - return raftId; -} - -void decodeRaftId(uint64_t raftId, char *host, int32_t len, uint16_t *port, uint16_t *vid) { - uint32_t host32 = (uint32_t)((raftId >> 32) & 0x00000000FFFFFFFF); - - struct in_addr addr; - addr.s_addr = host32; - snprintf(host, len, "%s", inet_ntoa(addr)); - - *port = (uint16_t)((raftId & 0x00000000FFFF0000) >> 16); - *vid = (uint16_t)(raftId & 0x000000000000FFFF); -} -*/ - - - - -int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) { - int ret; - - snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host); - pRaftServer->port = pConf->me.port; - snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port); - //strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir)); - - ret = uv_loop_init(&pRaftServer->loop); - if (ret != 0) { - fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret)); - assert(0); - } - - ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); - if (ret != 0) { - fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret); - assert(0); - } - - - uint16_t vid; - pRaftServer->instanceCount = 20; - - - for (int i = 0; i < pRaftServer->instanceCount; ++i) - { - //vid = 0; - vid = i; - - - pRaftServer->instance[vid].raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); - snprintf(pRaftServer->instance[vid].dir, sizeof(pRaftServer->instance[vid].dir), "%s_%llu", pConf->dataDir, pRaftServer->instance[vid].raftId); - - char cmd_buf[COMMAND_LEN]; - snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", pRaftServer->instance[vid].dir); - system(cmd_buf); - sleep(1); - - pRaftServer->instance[vid].fsm = pFsm; - - ret = raft_uv_init(&pRaftServer->instance[vid].io, &pRaftServer->loop, pRaftServer->instance[vid].dir, &pRaftServer->transport); - if (ret != 0) { - fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft)); - assert(0); - } - - ret = raft_init(&pRaftServer->instance[vid].raft, &pRaftServer->instance[vid].io, pRaftServer->instance[vid].fsm, pRaftServer->instance[vid].raftId, pRaftServer->address); - if (ret != 0) { - fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft)); - assert(0); - } - - struct raft_configuration conf; - raft_configuration_init(&conf); - raft_configuration_add(&conf, pRaftServer->instance[vid].raftId, pRaftServer->address, RAFT_VOTER); - printf("add myself: %llu - %s \n", pRaftServer->instance[vid].raftId, pRaftServer->address); - for (int i = 0; i < pConf->peersCount; ++i) { - const Addr *pAddr = &pConf->peers[i]; - - raft_id rid = encodeRaftId(pAddr->host, pAddr->port, vid); - - char addrBuf[ADDRESS_LEN]; - snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port); - raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER); - printf("add peers: %llu - %s \n", rid, addrBuf); - } - - raft_bootstrap(&pRaftServer->instance[vid].raft, &conf); - - } - - - - - - - - return 0; -} - -int32_t raftServerStart(SRaftServer *pRaftServer) { - int ret; - - for (int i = 0; i < pRaftServer->instanceCount; ++i) { - ret = raft_start(&pRaftServer->instance[i].raft); - if (ret != 0) { - fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[i].raft)); - } - - } - - - uv_run(&pRaftServer->loop, UV_RUN_DEFAULT); -} - - -void raftServerClose(SRaftServer *pRaftServer) { - -} - - -int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { - char *msg = (char*)buf->base; - printf("fsm apply: %s \n", msg); - - char arr[2][MAX_TOKEN_LEN]; - splitString(msg, "--", arr, 2); - putKV(arr[0], arr[1]); - - return 0; -} - -int32_t initFsm(struct raft_fsm *fsm) { - initStore(); - fsm->apply = fsmApplyCb; - return 0; -} diff --git a/contrib/test/traft/rebalance_leader/raftServer.h b/contrib/test/traft/rebalance_leader/raftServer.h deleted file mode 100644 index 5ea43985c9b0601d884f40d8e544e1d0955d2bc0..0000000000000000000000000000000000000000 --- a/contrib/test/traft/rebalance_leader/raftServer.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef TDENGINE_RAFT_SERVER_H -#define TDENGINE_RAFT_SERVER_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include -#include -#include "raft.h" -#include "raft/uv.h" -#include "common.h" - - -// simulate a db store, just for test -#define MAX_KV_LEN 20 -#define MAX_RECORD_COUNT 16 - - -//char *keys; -//char *values; -//int writeIndex; - -void initStore(); -void destroyStore(); -void putKV(const char *key, const char *value); -char *getKV(const char *key); - -typedef struct { - char dir[DIR_LEN + HOST_LEN * 4]; /* Data dir of UV I/O backend */ - raft_id raftId; /* For vote */ - struct raft_fsm *fsm; /* Sample application FSM */ - struct raft raft; /* Raft instance */ - struct raft_io io; /* UV I/O backend */ - -} SInstance; - -typedef struct { - char host[HOST_LEN]; - uint32_t port; - char address[ADDRESS_LEN]; /* Raft instance address */ - - struct uv_loop_s loop; /* UV loop */ - struct raft_uv_transport transport; /* UV I/O backend transport */ - - SInstance instance[MAX_INSTANCE_NUM]; - int32_t instanceCount; - -} SRaftServer; - -#define MAX_TOKEN_LEN 32 -int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr); - -int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm); -int32_t raftServerStart(SRaftServer *pRaftServer); -void raftServerClose(SRaftServer *pRaftServer); - - -int initFsm(struct raft_fsm *fsm); - - - - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_RAFT_SERVER_H diff --git a/contrib/test/traft/single_node/CMakeLists.txt b/contrib/test/traft/single_node/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..666ce271b856d405b9b9e6e07014f69fb6b709f8 --- /dev/null +++ b/contrib/test/traft/single_node/CMakeLists.txt @@ -0,0 +1,6 @@ +add_executable(singleNode "") +target_sources(singleNode + PRIVATE + "singleNode.c" +) +target_link_libraries(singleNode PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/single_node/clear.sh b/contrib/test/traft/single_node/clear.sh new file mode 100644 index 0000000000000000000000000000000000000000..398b3088f20ae8cce179ff909f206fc162918876 --- /dev/null +++ b/contrib/test/traft/single_node/clear.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +rm -rf 127.0.0.1* +rm -rf ./data diff --git a/contrib/test/traft/single_node/cmd b/contrib/test/traft/single_node/cmd new file mode 100644 index 0000000000000000000000000000000000000000..ff8a156f64025e588f226e8bc56f1a8b39298340 --- /dev/null +++ b/contrib/test/traft/single_node/cmd @@ -0,0 +1,6 @@ +all: + gcc singleNode.c -I ../../include/ ../../.libs/libraft.a -o singleNode -luv -llz4 -lpthread -g +clean: + rm -f singleNode + sh clear.sh + diff --git a/contrib/test/traft/single_node/singleNode.c b/contrib/test/traft/single_node/singleNode.c new file mode 100644 index 0000000000000000000000000000000000000000..d22af30e451fd5f536d8b908362e713ac29c7f3a --- /dev/null +++ b/contrib/test/traft/single_node/singleNode.c @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include +//#include + +#include "raft.h" + +SRaftEnv raftEnv; + +typedef struct Tsdb { + uint64_t lastApplyIndex; + void *mem; + void *imm; + void *store; +} Tsdb; + +void tsdbWrite(Tsdb *t, char *msg) {} + +void *startFunc(void *param) { + SRaftEnv *pSRaftEnv = (SRaftEnv *)param; + int32_t r = raftEnvStart(pSRaftEnv); + assert(r == 0); + return NULL; +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result, raft_index index) { + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: index:%llu value:%s \n", index, msg); + + Tsdb *t = pFsm->data; + if (index > t->lastApplyIndex) { + // apply value into tsdb + tsdbWrite(t, msg); + + // update lastApplyIndex + t->lastApplyIndex = index; + } + + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + void *ptr = req->data; + if (status != 0) { + printf("putValueCb error \n"); + } else { + printf("putValueCb ok \n"); + } + free(ptr); + free(req); +} + +void submitValue() { + // prepare value + struct raft_buffer buf; + buf.len = 32; + void *ptr = malloc(buf.len); + buf.base = ptr; + snprintf(buf.base, buf.len, "%ld", time(NULL)); + + // get raft + struct raft *r = getRaft(&raftEnv, 100); + assert(r != NULL); + // printRaftState(r); + + // submit value + struct raft_apply *req = malloc(sizeof(*req)); + req->data = ptr; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +int main(int argc, char **argv) { + // init raft env + int r = raftEnvInit(&raftEnv, "127.0.0.1", 38000, "./data"); + assert(r == 0); + + // start raft env + pthread_t tid; + pthread_create(&tid, NULL, startFunc, &raftEnv); + + // wait for start, just for simple + while (raftEnv.isStart != 1) { + sleep(1); + } + + // init fsm + struct raft_fsm *pFsm = malloc(sizeof(*pFsm)); + pFsm->apply = fsmApplyCb; + Tsdb *tsdb = malloc(sizeof(*tsdb)); + pFsm->data = tsdb; + + // add vgroup, id = 100, only has one replica + r = addRaftVoter(&raftEnv, NULL, 0, 100, pFsm); + assert(r == 0); + + // for test: submit a value every second + while (1) { + sleep(1); + submitValue(); + } + + return 0; +} diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 62f55609ce2fd9bab781af6c675dd93d287fcc3e..e12a46984f7723b86e1f2091b4400dc01c1010fb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1579,32 +1579,47 @@ typedef struct SMqSetCVgRsp { char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; -typedef struct SMqCVConsumeReq { +typedef struct SMqConsumeReq { int64_t reqId; int64_t offset; int64_t consumerId; int64_t blockingTime; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; -} SMqCVConsumeReq; +} SMqConsumeReq; -typedef struct SMqConsumeRspBlock { - int32_t bodyLen; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char body[]; -} SMqConsumeRspBlock; - -typedef struct SMqCVConsumeRsp { - int64_t reqId; - int64_t clientId; - int64_t committedOffset; - int64_t receiveOffset; - int64_t rspOffset; - int32_t skipLogNum; - int32_t bodyLen; - char topicName[TSDB_TOPIC_FNAME_LEN]; - SMqConsumeRspBlock blocks[]; -} SMqCvConsumeRsp; +typedef struct SMqColData { + int16_t colId; + int16_t type; + int16_t bytes; + char data[]; +} SMqColData; + +typedef struct SMqTbData { + int64_t uid; + int32_t numOfCols; + int32_t numOfRows; + SMqColData colData[]; +} SMqTbData; + +typedef struct SMqTopicBlk { + char topicName[TSDB_TOPIC_FNAME_LEN]; + int64_t committedOffset; + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; + int32_t bodyLen; + int32_t numOfTb; + SMqTbData tbData[]; +} SMqTopicData; + +typedef struct SMqConsumeRsp { + int64_t reqId; + int64_t clientId; + int32_t bodyLen; + int32_t numOfTopics; + SMqTopicData data[]; +} SMqConsumeRsp; #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 159a92b0ab4c3ca1b26c05121a448ac2a30a1d94..9cc66d68794a086034c9af3be9050b471d13eeeb 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -117,7 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - p->pAppHbMgr = appHbMgrInit(p); + /*p->pAppHbMgr = appHbMgrInit(p);*/ taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index ec088eb0735824f3348287ec54fd476f00dc7adb..81ea18fe08afa1ab70af3f06b994f167a9423dc4 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -72,7 +72,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); + /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/ // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 32730fc1e3a4631c91bbf01e2b36a3255566dee1..f952f69f20d1ef6168e1e93c820cf454626e9b09 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -621,5 +621,7 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey); dndReleaseMnode(pDnode, pMnode); + + dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); return code; } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index f4fda75bd86e20087c38e6b566089823443ede38..104e702afb40e7e768d15e70a38eb776d1da2f6b 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -171,7 +171,7 @@ static int32_t dndInitClient(SDnode *pDnode) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = "DND-C"; + rpcInit.label = "D-C"; rpcInit.numOfThreads = 1; rpcInit.cfp = dndProcessResponse; rpcInit.sessions = 1024; @@ -282,12 +282,12 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SDnode *pDnode = parent; if (dndAuthInternalReq(parent, user, spi, encrypt, secret, ckey) == 0) { - // dTrace("get internal auth success"); + dTrace("user:%s, get auth from internal mnode, spi:%d encrypt:%d", user, *spi, *encrypt); return 0; } if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) { - // dTrace("get auth from internal mnode"); + dTrace("user:%s, get auth from internal mnode, spi:%d encrypt:%d", user, *spi, *encrypt); return 0; } @@ -296,13 +296,12 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char return -1; } - // dDebug("user:%s, send auth msg to other mnodes", user); - SAuthReq *pReq = rpcMallocCont(sizeof(SAuthReq)); tstrncpy(pReq->user, user, TSDB_USER_LEN); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = sizeof(SAuthReq), .msgType = TDMT_MND_AUTH}; SRpcMsg rpcRsp = {0}; + dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, pReq->spi, pReq->encrypt); dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { @@ -314,7 +313,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char memcpy(ckey, pRsp->ckey, TSDB_PASSWORD_LEN); *spi = pRsp->spi; *encrypt = pRsp->encrypt; - dDebug("user:%s, success to get user auth from other mnodes", user); + dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, pRsp->spi, pRsp->encrypt); } rpcFreeCont(rpcRsp.pCont); @@ -333,7 +332,7 @@ static int32_t dndInitServer(SDnode *pDnode) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = pDnode->cfg.serverPort; - rpcInit.label = "DND-S"; + rpcInit.label = "D-S"; rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = dndProcessRequest; rpcInit.sessions = pDnode->cfg.maxShellConns; diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index 52730412e90a883de3724ece67dce7c2a9e39bbc..3e9ab99a455036077081d94eae3eb7a4ecc2abb5 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -28,6 +28,14 @@ void mndCleanupAuth(SMnode *pMnode) {} int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; } static int32_t mndProcessAuthReq(SMnodeMsg *pReq) { - mDebug("user:%s, auth req is processed", pReq->user); - return 0; + SAuthReq *pAuth = pReq->rpcMsg.pCont; + + int32_t contLen = sizeof(SAuthRsp); + SAuthRsp *pRsp = rpcMallocCont(contLen); + pReq->pCont = pRsp; + pReq->contLen = contLen; + + int32_t code = mndRetriveAuth(pReq->pMnode, pAuth->user, &pRsp->spi, &pRsp->encrypt, pRsp->secret, pRsp->ckey); + mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->user, pAuth->spi, pAuth->encrypt, pAuth->user); + return code; } \ No newline at end of file diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index bb0ee8dfc40d3b4fcae513ebe580e1561290b5fa..9e6ecb6e23c5a7819651db1036cfdfdf323df636 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -68,12 +68,13 @@ typedef struct { typedef struct STqReadHandle { int64_t ver; + uint64_t tbUid; SSubmitMsg* pMsg; SSubmitBlk* pBlock; SSubmitMsgIter msgIter; SSubmitBlkIter blkIter; SMeta* pMeta; - SArray* pColumnIdList; + SArray* pColIdList; } STqReadHandle; /* ------------------------ SVnode ------------------------ */ @@ -199,8 +200,12 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta); -static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColumnIdList) { - pReadHandle->pColumnIdList = pColumnIdList; +static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { + pReadHandle->pColIdList = pColIdList; +} + +static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) { + pHandle->tbUid = tbUid; } void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index eca02c867cff6ff900c0cc0e8ce3dc3e8565ace1..b18f50cd3f24606212e002ebca43bd35b20eb9e3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,6 +15,7 @@ #include "tqInt.h" #include "tqMetaStore.h" +#include "tcompare.h" // static // read next version data @@ -424,7 +425,7 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/ /*if (code < 0) {*/ - // TODO: error + // TODO: error /*}*/ // get msgType // if submitblk @@ -609,62 +610,92 @@ int tqItemSSize() { } int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { - SMqCVConsumeReq* pReq = pMsg->pCont; + SMqConsumeReq* pReq = pMsg->pCont; + SRpcMsg rpcMsg; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; int64_t reqOffset = pReq->offset; int64_t fetchOffset = reqOffset; int64_t blockingTime = pReq->blockingTime; + int rspLen = 0; + STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); - int sz = taosArrayGetSize(pConsumer->topics); + int sz = taosArrayGetSize(pConsumer->topics); - for (int i = 0 ; i < sz; i++) { - STqTopicHandle *pTopic = taosArrayGet(pConsumer->topics, i); + for (int i = 0; i < sz; i++) { + STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i); - int8_t pos = fetchOffset % TQ_BUFFER_SIZE; - int8_t old = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); - if (old == 1) { - // do nothing - continue; - } - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - return -1; - } - SWalHead* pHead = pTopic->pReadhandle->pHead; + int8_t pos; + int8_t skip = 0; + SWalHead* pHead; while (1) { + pos = fetchOffset % TQ_BUFFER_SIZE; + skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); + if (skip == 1) { + // do nothing + break; + } + if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + // check err + atomic_store_8(&pTopic->buffer.output[pos].status, 0); + skip = 1; + break; + } // read until find TDMT_VND_SUBMIT + pHead = pTopic->pReadhandle->pHead; + if (pHead->head.msgType == TDMT_VND_SUBMIT) { + break; + } if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - return -1; + atomic_store_8(&pTopic->buffer.output[pos].status, 0); + skip = 1; + break; } + atomic_store_8(&pTopic->buffer.output[pos].status, 0); + fetchOffset++; } + if (skip == 1) continue; SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; - void* task = pTopic->buffer.output[pos].task; + qTaskInfo_t task = pTopic->buffer.output[pos].task; qSetStreamInput(task, pCont); - SSDataBlock* pDataBlock; - uint64_t ts; - if (qExecTask(task, &pDataBlock, &ts) < 0) { + //SArray + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + while (1) { + SSDataBlock* pDataBlock; + uint64_t ts; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + break; + } + if (pDataBlock != NULL) { + taosArrayPush(pRes, pDataBlock); + } else { + break; + } + } + + atomic_store_8(&pTopic->buffer.output[pos].status, 0); + + if (taosArrayGetSize(pRes) == 0) { + taosArrayDestroy(pRes); + fetchOffset++; + continue; } - // TODO: launch query and get output data - pTopic->buffer.output[pos].dst = pDataBlock; - if (pTopic->buffer.firstOffset == -1 - || pReq->offset < pTopic->buffer.firstOffset) { + + pTopic->buffer.output[pos].dst = pRes; + if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) { pTopic->buffer.firstOffset = pReq->offset; } - if (pTopic->buffer.lastOffset == -1 - || pReq->offset > pTopic->buffer.lastOffset) { + if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { pTopic->buffer.lastOffset = pReq->offset; } - atomic_store_8(&pTopic->buffer.output[pos].status, 1); - // put output into rsp } // launch query // get result - SMqCvConsumeRsp* pRsp; return 0; } @@ -673,14 +704,14 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { if (pConsumer == NULL) { return -1; } - + STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); if (pTopic == NULL) { free(pConsumer); return -1; } - strcpy(pTopic->topicName, pReq->topicName); - strcpy(pTopic->cgroup, pReq->cgroup); + strcpy(pTopic->topicName, pReq->topicName); + strcpy(pTopic->cgroup, pReq->cgroup); strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->physicalPlan, pReq->physicalPlan); @@ -689,7 +720,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { pTopic->buffer.lastOffset = -1; pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); if (pTopic->pReadhandle == NULL) { - } for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; @@ -708,7 +738,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->pMeta = pMeta; pReadHandle->pMsg = NULL; pReadHandle->ver = -1; - pReadHandle->pColumnIdList = NULL; + pReadHandle->pColIdList = NULL; return NULL; } @@ -720,20 +750,18 @@ void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ve } bool tqNextDataBlock(STqReadHandle* pHandle) { - if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { - return false; + while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) >= 0) { + if (pHandle->tbUid == pHandle->pBlock->uid) return true; } - return true; + return false; } int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - SMemRow row; - int32_t sversion = pHandle->pBlock->sversion; - SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false); - pBlockInfo->numOfCols = pSchema->nCols; + /*int32_t sversion = pHandle->pBlock->sversion;*/ + /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/ + pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->uid = pHandle->pBlock->uid; - // TODO: filter out unused column return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 9b1f80f32959a66e9a89baaccb3a622cb60f5e55..dcdc03adf6666f79d4e2375f34449cd5df2de430 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -96,7 +96,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } - vInfo("vgId:%d process create %"PRIzu" tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); + vDebug("vgId:%d process create %"PRIzu" tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); break; diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index f381768a34de1a6c46a55f5aeaa7d495f9d4d655..d870ae98ab886f17ea866f27dcde381fc73c6242 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -752,8 +752,8 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid, - hashstr); + tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid, sid, + hashstr, pConn->spi); } return pConn; @@ -1612,7 +1612,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { } } } else { - tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); + tError("%s, auth spi:%d not matched with received:%d %p", pConn->info, pConn->spi, pHead->spi, pConn); code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f2d844f73d1bb6c253ed60b045341c4a33962d75..00bc1b621fb05445629f0ee21324629e9ee70f98 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,6 +21,7 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t* writeReq; + void* hostThrd; SConnBuffer readBuf; void* data; queue conn; @@ -45,7 +46,7 @@ typedef struct SCliThrdObj { queue msg; pthread_mutex_t msgMtx; uint64_t nextTimeout; // next timeout - void* shandle; // + void* pTransInst; // } SCliThrdObj; @@ -69,7 +70,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* // register timer in each thread to clear expire conn static void clientTimeoutCb(uv_timer_t* handle); -// process data read from server, auth/decompress etc +// process data read from server, auth/decompress etc later static void clientProcessData(SCliConn* conn); // check whether already read complete packet from server static bool clientReadComplete(SConnBuffer* pBuf); @@ -91,20 +92,25 @@ static void* clientThread(void* arg); static void clientProcessData(SCliConn* conn) { STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; - SRpcInfo* pRpc = pCtx->ahandle; + SRpcInfo* pRpc = pCtx->pRpc; SRpcMsg rpcMsg; rpcMsg.pCont = conn->readBuf.buf; rpcMsg.contLen = conn->readBuf.len; rpcMsg.ahandle = pCtx->ahandle; (pRpc->cfp)(NULL, &rpcMsg, NULL); + + SCliThrdObj* pThrd = conn->hostThrd; + addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); + free(pCtx->ip); + free(pCtx); // impl } static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; - SRpcInfo* pRpc = pThrd->shandle; + SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); @@ -127,7 +133,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { } static void* connCacheCreate(int size) { SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - return false; + return cache; } static void* connCacheDestroy(void* cache) { SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); @@ -153,8 +159,9 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { if (plist == NULL) { SConnList list; plist = &list; - QUEUE_INIT(&plist->conn); taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist)); + plist = taosHashGet(pCache, key, strlen(key)); + QUEUE_INIT(&plist->conn); } if (QUEUE_IS_EMPTY(&plist->conn)) { @@ -169,8 +176,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx; - SRpcInfo* pRpc = ctx->pRpc; + SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); // list already create before @@ -200,10 +206,11 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; if (pBuf->cap == 0) { - pBuf->buf = (char*)calloc(CAPACITY, sizeof(char)); + pBuf->buf = (char*)calloc(1, CAPACITY * sizeof(char)); pBuf->len = 0; pBuf->cap = CAPACITY; pBuf->left = -1; + buf->base = pBuf->buf; buf->len = CAPACITY; } else { @@ -213,7 +220,7 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, pBuf->buf = realloc(pBuf->buf, pBuf->cap); } else if (pBuf->len + pBuf->left > pBuf->cap) { pBuf->cap = pBuf->len + pBuf->left; - pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left); + pBuf->buf = realloc(pBuf->buf, pBuf->cap); } } buf->base = pBuf->buf + pBuf->len; @@ -227,7 +234,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { - tDebug("alread read complete pack"); + tDebug("alread read complete"); clientProcessData(conn); } else { tDebug("read halp packet, continue to read"); @@ -260,7 +267,12 @@ static void clientWriteCb(uv_write_t* req, int status) { uv_close((uv_handle_t*)pConn->stream, clientDestroy); return; } - + SCliThrdObj* pThrd = pConn->hostThrd; + if (pConn->stream == NULL) { + pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); + pConn->stream->data = pConn; + } uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); // impl later } @@ -270,35 +282,35 @@ static void clientWrite(SCliConn* pConn) { SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - int msgLen = transMsgLenFromCont(pMsg->contLen); - char* msg = (char*)(pHead); + int msgLen = transMsgLenFromCont(pMsg->contLen); + + pHead->msgType = pMsg->msgType; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - uv_buf_t wb = uv_buf_init(msg, msgLen); + uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); + tDebug("data write out, msgType : %d, len: %d", pHead->msgType, msgLen); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; - if (status != 0) { - tError("failed to connect %s", uv_err_name(status)); - clientConnDestroy(pConn); - return; - } + SCliMsg* pMsg = pConn->data; - SCliMsg* pMsg = pConn->data; - STransConnCtx* pCtx = ((SCliMsg*)(pConn->data))->ctx; - - SRpcMsg rpcMsg; - rpcMsg.ahandle = pCtx->ahandle; + STransConnCtx* pCtx = pMsg->ctx; + SRpcInfo* pRpc = pCtx->pRpc; if (status != 0) { + // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); + tError("failed to connect server, errmsg: %s", uv_strerror(status)); // call user fp later - tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - SRpcInfo* pRpc = pMsg->ctx->pRpc; + SRpcMsg rpcMsg; + rpcMsg.ahandle = pCtx->ahandle; + // SRpcInfo* pRpc = pMsg->ctx->pRpc; (pRpc->cfp)(NULL, &rpcMsg, NULL); uv_close((uv_handle_t*)req->handle, clientDestroy); return; } + assert(pConn->stream == req->handle); clientWrite(pConn); } @@ -315,17 +327,27 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { // impl later conn->data = pMsg; conn->writeReq->data = conn; + + conn->readBuf.len = 0; + memset(conn->readBuf.buf, 0, conn->readBuf.cap); + conn->readBuf.left = -1; clientWrite(conn); } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); + // read/write stream handle conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + conn->stream->data = conn; + + // write req handle conn->writeReq = malloc(sizeof(uv_write_t)); + conn->writeReq->data = conn; QUEUE_INIT(&conn->conn); conn->connReq.data = conn; conn->data = pMsg; + conn->hostThrd = pThrd; struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); @@ -359,23 +381,24 @@ static void clientAsyncCb(uv_async_t* handle) { static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; - SRpcInfo* pRpc = pThrd->shandle; - pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; - uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0); + uv_run(pThrd->loop, UV_RUN_DEFAULT); } void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SClientObj* cli = calloc(1, sizeof(SClientObj)); + SRpcInfo* pRpc = shandle; memcpy(cli->label, label, strlen(label)); cli->numOfThreads = numOfThreads; cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); for (int i = 0; i < cli->numOfThreads; i++) { SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); @@ -385,8 +408,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, pThrd->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pThrd->pTimer); + pThrd->pTimer->data = pThrd; + pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; - pThrd->shandle = shandle; + pThrd->cache = connCacheCreate(1); + pThrd->pTransInst = shandle; int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 45425410430d74ec7754b413af95d5b03d858788..77b5f635f4e4b31cd8d9c8681a358d4cdfb0fa8c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -24,13 +24,15 @@ typedef struct SConn { uv_async_t* pWorkerAsync; queue queue; int ref; - int persist; // persist connection or not - SConnBuffer connBuf; // read buf, - SConnBuffer writeBuf; // write buf + int persist; // persist connection or not + SConnBuffer connBuf; // read buf, int count; - void* shandle; // rpc init - void* ahandle; // + int inType; + void* pTransInst; // rpc init + void* ahandle; // void* hostThrd; + + SRpcMsg sendMsg; // del later char secured; int spi; @@ -48,7 +50,7 @@ typedef struct SWorkThrdObj { uv_async_t* workerAsync; // queue conn; pthread_mutex_t connMtx; - void* shandle; + void* pTransInst; } SWorkThrdObj; typedef struct SServerObj { @@ -66,7 +68,7 @@ typedef struct SServerObj { static const char* notify = "a"; // refactor later -static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen); +static int transAddAuthPart(SConn* pConn, char* msg, int msgLen); static int uvAuthMsg(SConn* pConn, char* msg, int msgLen); @@ -75,10 +77,13 @@ static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnTimeoutCb(uv_timer_t* handle); static void uvOnWriteCb(uv_write_t* req, int status); +static void uvOnPipeWriteCb(uv_write_t* req, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); +static void uvPrepareSendData(SConn* conn, uv_buf_t* wb); + // already read complete packet static bool readComplete(SConnBuffer* buf); @@ -135,25 +140,28 @@ static bool readComplete(SConnBuffer* data) { if (msgLen > data->len) { data->left = msgLen - data->len; return false; - } else { + } else if (msgLen == data->len) { return true; + } else if (msgLen < data->len) { + return false; + // handle other packet later } } else { return false; } } -static void uvDoProcess(SRecvInfo* pRecv) { - // impl later - STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; - SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; - SConn* pConn = pRecv->thandle; - tDump(pRecv->msg, pRecv->msgLen); - terrno = 0; - // SRpcReqContext* pContest; - - // do auth and check -} +// static void uvDoProcess(SRecvInfo* pRecv) { +// // impl later +// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; +// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; +// SConn* pConn = pRecv->thandle; +// tDump(pRecv->msg, pRecv->msgLen); +// terrno = 0; +// // SRpcReqContext* pContest; +// +// // do auth and check +//} static int uvAuthMsg(SConn* pConn, char* msg, int len) { STransMsgHead* pHead = (STransMsgHead*)msg; @@ -222,12 +230,13 @@ static void uvProcessData(SConn* pConn) { p->msgLen = pBuf->len; p->ip = 0; p->port = 0; - p->shandle = pConn->shandle; // + p->shandle = pConn->pTransInst; // p->thandle = pConn; p->chandle = NULL; - // STransMsgHead* pHead = (STransMsgHead*)p->msg; + + pConn->inType = pHead->msgType; assert(transIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; @@ -247,7 +256,9 @@ static void uvProcessData(SConn* pConn) { // add compress later // pHead = rpcDecompressRpcMsg(pHead); } else { + pHead->msgLen = htonl(pHead->msgLen); // impl later + // } rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; @@ -257,7 +268,7 @@ static void uvProcessData(SConn* pConn) { rpcMsg.handle = pConn; (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); - uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0); + uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth // validate msg type } @@ -277,8 +288,9 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } if (nread != UV_EOF) { - tDebug("Read error %s\n", uv_err_name(nread)); + tDebug("Read error %s", uv_err_name(nread)); } + tDebug("read error %s", uv_err_name(nread)); uv_close((uv_handle_t*)cli, uvConnDestroy); } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -293,16 +305,48 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnWriteCb(uv_write_t* req, int status) { SConn* conn = req->data; + + SConnBuffer* buf = &conn->connBuf; + buf->len = 0; + memset(buf->buf, 0, buf->cap); + buf->left = -1; if (status == 0) { tDebug("data already was written on stream"); } else { + tDebug("failed to write data, %s", uv_err_name(status)); connDestroy(conn); } // opt } +static void uvOnPipeWriteCb(uv_write_t* req, int status) { + if (status == 0) { + tDebug("success to dispatch conn to work thread"); + } else { + tError("fail to dispatch conn to work thread"); + } +} +static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { + // impl later + SRpcMsg* pMsg = &conn->sendMsg; + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + pHead->msgType = conn->inType + 1; + // add more info + char* msg = (char*)pHead; + int32_t len = transMsgLenFromCont(pMsg->contLen); + if (transCompressMsg(msg, len, NULL)) { + // impl later + } + pHead->msgLen = htonl(len); + wb->base = msg; + wb->len = len; +} void uvWorkerAsyncCb(uv_async_t* handle) { - SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); + SWorkThrdObj* pThrd = handle->data; SConn* conn = NULL; queue wq; // batch process to avoid to lock/unlock frequently @@ -318,8 +362,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("except occurred, do nothing"); return; } - uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); - + uv_buf_t wb; + uvPrepareSendData(conn, &wb); uv_timer_stop(conn->pTimer); uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); @@ -341,8 +385,9 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; + tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); - uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); + uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { uv_close((uv_handle_t*)cli, NULL); } @@ -374,7 +419,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { assert(pending == UV_TCP); SConn* pConn = connCreate(); - pConn->shandle = pThrd->shandle; + pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pConn->pTimer); @@ -398,6 +443,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tDebug("new connection created: %d", fd); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); } else { + tDebug("failed to create new connection"); connDestroy(pConn); } } @@ -418,14 +464,12 @@ void* acceptThread(void* arg) { } uv_run(srv->loop, UV_RUN_DEFAULT); } -void* workerThread(void* arg) { - SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; - +static void initWorkThrdObj(SWorkThrdObj* pThrd) { pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); // SRpcInfo* pRpc = pThrd->shandle; - uv_pipe_init(pThrd->loop, pThrd->pipe, 0); + uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; @@ -435,8 +479,12 @@ void* workerThread(void* arg) { pThrd->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); + pThrd->workerAsync->data = pThrd; uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +} +void* workerThread(void* arg) { + SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -444,34 +492,39 @@ static SConn* connCreate() { SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); return pConn; } +static void connCloseCb(uv_handle_t* handle) { + // impl later + // +} static void connDestroy(SConn* conn) { if (conn == NULL) { return; } uv_timer_stop(conn->pTimer); free(conn->pTimer); - uv_close((uv_handle_t*)conn->pTcp, NULL); - free(conn->connBuf.buf); + // uv_close((uv_handle_t*)conn->pTcp, connCloseCb); free(conn->pTcp); + free(conn->connBuf.buf); free(conn->pWriter); - free(conn); + // free(conn); // handle } static void uvConnDestroy(uv_handle_t* handle) { SConn* conn = handle->data; connDestroy(conn); } -static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen) { - SRpcHead* pHead = (SRpcHead*)msg; +static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) { + STransMsgHead* pHead = (STransMsgHead*)msg; if (pConn->spi && pConn->secured == 0) { // add auth part pHead->spi = pConn->spi; - SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen); + STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen); pDigest->timeStamp = htonl(taosGetTimestampSec()); msgLen += sizeof(SRpcDigest); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); } else { pHead->spi = 0; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); @@ -502,9 +555,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - thrd->shandle = shandle; + thrd->pTransInst = shandle; thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read + + initWorkThrdObj(thrd); int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); if (err == 0) { tDebug("sucess to create worker-thread %d", i); @@ -547,6 +602,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) { SWorkThrdObj* pThrd = pConn->hostThrd; // opt later + pConn->sendMsg = *pMsg; pthread_mutex_lock(&pThrd->connMtx); QUEUE_PUSH(&pThrd->conn, &pConn->queue); pthread_mutex_unlock(&pThrd->connMtx);