From 31868e951a372683fe15d1a8b8dbebba1296bcdd Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 30 Dec 2021 20:55:44 +0800 Subject: [PATCH] add traft --- cmake/cmake.options | 6 + cmake/traft_CMakeLists.txt.in | 14 + contrib/CMakeLists.txt | 18 + contrib/test/CMakeLists.txt | 4 + contrib/test/traft/CMakeLists.txt | 7 + contrib/test/traft/clear.sh | 4 + contrib/test/traft/common.h | 36 ++ contrib/test/traft/help.txt | 18 + contrib/test/traft/raftMain.c | 659 ++++++++++++++++++++++++++++++ contrib/test/traft/raftServer.c | 222 ++++++++++ contrib/test/traft/raftServer.h | 68 +++ 11 files changed, 1056 insertions(+) create mode 100644 cmake/traft_CMakeLists.txt.in create mode 100644 contrib/test/traft/CMakeLists.txt create mode 100644 contrib/test/traft/clear.sh create mode 100644 contrib/test/traft/common.h create mode 100644 contrib/test/traft/help.txt create mode 100644 contrib/test/traft/raftMain.c create mode 100644 contrib/test/traft/raftServer.c create mode 100644 contrib/test/traft/raftServer.h diff --git a/cmake/cmake.options b/cmake/cmake.options index 44fa8c7e4b..faa45256fb 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -56,6 +56,12 @@ option( OFF ) +option( + BUILD_WITH_TRAFT + "If build with traft" + OFF +) + option( BUILD_DEPENDENCY_TESTS "If build dependency tests" diff --git a/cmake/traft_CMakeLists.txt.in b/cmake/traft_CMakeLists.txt.in new file mode 100644 index 0000000000..9b571b3666 --- /dev/null +++ b/cmake/traft_CMakeLists.txt.in @@ -0,0 +1,14 @@ + +# traft +ExternalProject_Add(traft + GIT_REPOSITORY https://github.com/taosdata/traft.git + GIT_TAG for_3.0 + SOURCE_DIR "${CMAKE_CONTRIB_DIR}/traft" + BINARY_DIR "${CMAKE_CONTRIB_DIR}/traft" + #BUILD_IN_SOURCE TRUE + # https://answers.ros.org/question/333125/how-to-include-external-automakeautoconf-projects-into-ament_cmake/ + CONFIGURE_COMMAND COMMAND autoreconf -i COMMAND ./configure --enable-example + BUILD_COMMAND "$(MAKE)" + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index c08f894fe7..074014b0d7 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -41,6 +41,12 @@ if(${BUILD_WITH_CRAFT}) SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE) endif(${BUILD_WITH_CRAFT}) +# traft +if(${BUILD_WITH_TRAFT}) + cat("${CMAKE_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE) +endif(${BUILD_WITH_TRAFT}) + #libuv if(${BUILD_WITH_UV}) cat("${CMAKE_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -173,6 +179,18 @@ if(${BUILD_WITH_CRAFT}) # ) endif(${BUILD_WITH_CRAFT}) +# TRAFT +if(${BUILD_WITH_TRAFT}) + add_library(traft STATIC IMPORTED GLOBAL) + set_target_properties(traft PROPERTIES + IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a" + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include" + ) + # target_link_libraries(craft + # INTERFACE pthread + # ) +endif(${BUILD_WITH_TRAFT}) + # LIBUV if(${BUILD_WITH_UV}) add_subdirectory(libuv) diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index 330fe8f70f..0c71113056 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -19,4 +19,8 @@ if(${BUILD_WITH_CRAFT}) add_subdirectory(craft) endif(${BUILD_WITH_CRAFT}) +if(${BUILD_WITH_TRAFT}) + add_subdirectory(traft) +endif(${BUILD_WITH_TRAFT}) + add_subdirectory(tdev) diff --git a/contrib/test/traft/CMakeLists.txt b/contrib/test/traft/CMakeLists.txt new file mode 100644 index 0000000000..e29fea04f1 --- /dev/null +++ b/contrib/test/traft/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(raftMain "") +target_sources(raftMain + PRIVATE + "raftMain.c" + "raftServer.c" +) +target_link_libraries(raftMain PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/clear.sh b/contrib/test/traft/clear.sh new file mode 100644 index 0000000000..398b3088f2 --- /dev/null +++ b/contrib/test/traft/clear.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +rm -rf 127.0.0.1* +rm -rf ./data diff --git a/contrib/test/traft/common.h b/contrib/test/traft/common.h new file mode 100644 index 0000000000..0229c29cf7 --- /dev/null +++ b/contrib/test/traft/common.h @@ -0,0 +1,36 @@ +#ifndef TDENGINE_COMMON_H +#define TDENGINE_COMMON_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +#define MAX_INSTANCE_NUM 100 + +#define MAX_PEERS 10 +#define COMMAND_LEN 1024 +#define TOKEN_LEN 128 +#define DIR_LEN 256 +#define HOST_LEN 64 +#define ADDRESS_LEN (HOST_LEN + 16) + +typedef struct { + char host[HOST_LEN]; + uint32_t port; +} Addr; + +typedef struct { + Addr me; + Addr peers[MAX_PEERS]; + int peersCount; + char dir[DIR_LEN]; + char dataDir[DIR_LEN + HOST_LEN * 2]; +} SRaftServerConfig; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_COMMON_H diff --git a/contrib/test/traft/help.txt b/contrib/test/traft/help.txt new file mode 100644 index 0000000000..7709e80e0a --- /dev/null +++ b/contrib/test/traft/help.txt @@ -0,0 +1,18 @@ + +make raftServer + +all: + gcc raftMain.c raftServer.c -I ../../traft/include/ ../../traft/.libs/libraft.a -o raftMain -luv -llz4 -lpthread -g +clean: + rm -f raftMain + sh clear.sh + + +make traft: + +sudo apt-get install libuv1-dev liblz4-dev +autoreconf -i +./configure --enable-example +make + + diff --git a/contrib/test/traft/raftMain.c b/contrib/test/traft/raftMain.c new file mode 100644 index 0000000000..24ad93856c --- /dev/null +++ b/contrib/test/traft/raftMain.c @@ -0,0 +1,659 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "raftServer.h" +#include "common.h" + +const char *exe_name; + +typedef struct LeaderState { + char address[48]; + int leaderCount; + +} LeaderState; + +#define NODE_COUNT 3 +LeaderState leaderStates[NODE_COUNT]; + +void printLeaderCount() { + for (int i = 0; i < NODE_COUNT; ++i) { + printf("%s: leaderCount:%d \n", leaderStates[i].address, leaderStates[i].leaderCount); + } +} + +void updateLeaderStates(SRaftServer *pRaftServer) { + for (int i = 0; i < pRaftServer->instance[0].raft.configuration.n; ++i) { + snprintf(leaderStates[i].address, sizeof(leaderStates[i].address), "%s", pRaftServer->instance[0].raft.configuration.servers[i].address); + leaderStates[i].leaderCount = 0; + } + + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + struct raft *r = &pRaftServer->instance[i].raft; + + char leaderAddress[128]; + memset(leaderAddress, 0, sizeof(leaderAddress)); + + if (r->state == RAFT_LEADER) { + snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address); + } else if (r->state == RAFT_FOLLOWER) { + snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address); + } + + for (int j = 0; j < NODE_COUNT; j++) { + if (strcmp(leaderAddress, leaderStates[j].address) == 0) { + leaderStates[j].leaderCount++; + } + } + } +} + + +void raftTransferCb(struct raft_transfer *req) { + SRaftServer *pRaftServer = req->data; + raft_free(req); + + printf("raftTransferCb: \n"); + updateLeaderStates(pRaftServer); + printLeaderCount(); + + int myLeaderCount; + for (int i = 0; i < NODE_COUNT; ++i) { + if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { + myLeaderCount = leaderStates[i].leaderCount; + } + } + + printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); + if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { + struct raft *r; + for (int j = 0; j < pRaftServer->instanceCount; ++j) { + if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { + r = &pRaftServer->instance[j].raft; + } + } + + struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); + transfer->data = pRaftServer; + + uint64_t destRaftId; + int minIndex = -1; + int minLeaderCount = myLeaderCount; + for (int j = 0; j < NODE_COUNT; ++j) { + if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue; + if (leaderStates[j].leaderCount <= minLeaderCount) { + minIndex = j; + } + } + + char myHost[48]; + uint16_t myPort; + uint16_t myVid; + decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); + + char *destAddress = leaderStates[minIndex].address; + + char tokens[MAX_PEERS][MAX_TOKEN_LEN]; + splitString(destAddress, ":", tokens, 2); + char *destHost = tokens[0]; + uint16_t destPort = atoi(tokens[1]); + destRaftId = encodeRaftId(destHost, destPort, myVid); + + raft_transfer(r, transfer, destRaftId, raftTransferCb); + } + +} + + +void parseAddr(const char *addr, char *host, int len, uint32_t *port) { + char* tmp = (char*)malloc(strlen(addr) + 1); + strcpy(tmp, addr); + + char* context; + char* separator = ":"; + char* token = strtok_r(tmp, separator, &context); + if (token) { + snprintf(host, len, "%s", token); + } + + token = strtok_r(NULL, separator, &context); + if (token) { + sscanf(token, "%u", port); + } + + free(tmp); +} + +// only parse 3 tokens +int parseCommand3(const char* str, char* token1, char* token2, char* token3, int len) +{ + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + + char* context; + char* separator = " "; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token1, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token2, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token3, token, len); + n++; + } + +ret: + return n; + free(tmp); +} + +// only parse 4 tokens +int parseCommand4(const char* str, char* token1, char* token2, char* token3, char *token4, int len) +{ + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + + char* context; + char* separator = " "; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token1, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token2, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token3, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token4, token, len); + n++; + } + +ret: + return n; + free(tmp); +} + +void *startServerFunc(void *param) { + SRaftServer *pServer = (SRaftServer*)param; + int32_t r = raftServerStart(pServer); + assert(r == 0); + + return NULL; +} + +// Console --------------------------------- +const char* state2String(unsigned short state) { + if (state == RAFT_UNAVAILABLE) { + return "RAFT_UNAVAILABLE"; + + } else if (state == RAFT_FOLLOWER) { + return "RAFT_FOLLOWER"; + + } else if (state == RAFT_CANDIDATE) { + return "RAFT_CANDIDATE"; + + } else if (state == RAFT_LEADER) { + return "RAFT_LEADER"; + + } + return "UNKNOWN_RAFT_STATE"; +} + + +void printRaftState2(struct raft *r) { + + char leaderAddress[128]; + memset(leaderAddress, 0, sizeof(leaderAddress)); + + if (r->state == RAFT_LEADER) { + snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address); + } else if (r->state == RAFT_FOLLOWER) { + snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address); + } + + for (int i = 0; i < r->configuration.n; ++i) { + char tmpAddress[128]; + snprintf(tmpAddress, sizeof(tmpAddress), "%s", r->configuration.servers[i].address); + + uint64_t raftId = r->configuration.servers[i].id; + char host[128]; + uint16_t port; + uint16_t vid; + decodeRaftId(raftId, host, 128, &port, &vid); + + char buf[512]; + memset(buf, 0, sizeof(buf)); + if (strcmp(tmpAddress, leaderAddress) == 0) { + snprintf(buf, sizeof(buf), "<%s:%u-%u-LEADER>\t", host, port, vid); + } else { + snprintf(buf, sizeof(buf), "<%s:%u-%u-FOLLOWER>\t", host, port, vid); + } + printf("%s", buf); + } + printf("\n"); +} + +void printRaftConfiguration(struct raft_configuration *c) { + printf("configuration: \n"); + for (int i = 0; i < c->n; ++i) { + printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address); + } +} + +void printRaftState(struct raft *r) { + printf("----Raft State: -----------\n"); + printf("mem_addr: %p \n", r); + printf("my_id: %llu \n", r->id); + printf("address: %s \n", r->address); + printf("current_term: %llu \n", r->current_term); + printf("voted_for: %llu \n", r->voted_for); + printf("role: %s \n", state2String(r->state)); + printf("commit_index: %llu \n", r->commit_index); + printf("last_applied: %llu \n", r->last_applied); + printf("last_stored: %llu \n", r->last_stored); + + printf("configuration_index: %llu \n", r->configuration_index); + printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); + printRaftConfiguration(&r->configuration); + + printf("----------------------------\n"); +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + raft_free(req); + struct raft *r = req->data; + if (status != 0) { + printf("putValueCb: %s \n", raft_errmsg(r)); + } else { + printf("putValueCb: %s \n", "ok"); + } +} + +void putValue(struct raft *r, const char *value) { + struct raft_buffer buf; + + buf.len = TOKEN_LEN;; + buf.base = raft_malloc(buf.len); + snprintf(buf.base, buf.len, "%s", value); + + struct raft_apply *req = raft_malloc(sizeof(struct raft_apply)); + req->data = r; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char*)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +void getValue(const char *key) { + char *ptr = getKV(key); + if (ptr) { + printf("get value: [%s] \n", ptr); + } else { + printf("value not found for key: [%s] \n", key); + } +} + +void console(SRaftServer *pRaftServer) { + while (1) { + char cmd_buf[COMMAND_LEN]; + memset(cmd_buf, 0, sizeof(cmd_buf)); + char *ret = fgets(cmd_buf, COMMAND_LEN, stdin); + if (!ret) { + exit(-1); + } + + int pos = strlen(cmd_buf); + if(cmd_buf[pos - 1] == '\n') { + cmd_buf[pos - 1] = '\0'; + } + + if (strncmp(cmd_buf, "", COMMAND_LEN) == 0) { + continue; + } + + char cmd[TOKEN_LEN]; + memset(cmd, 0, sizeof(cmd)); + + char param1[TOKEN_LEN]; + memset(param1, 0, sizeof(param1)); + + char param2[TOKEN_LEN]; + memset(param2, 0, sizeof(param2)); + + char param3[TOKEN_LEN]; + memset(param2, 0, sizeof(param2)); + + parseCommand4(cmd_buf, cmd, param1, param2, param3, TOKEN_LEN); + if (strcmp(cmd, "addnode") == 0) { + printf("not support \n"); + + /* + char host[HOST_LEN]; + uint32_t port; + parseAddr(param1, host, HOST_LEN, &port); + uint64_t rid = raftId(host, port); + + struct raft_change *req = raft_malloc(sizeof(*req)); + int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL); + if (r != 0) { + printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft)); + } + printf("add node: %lu %s \n", rid, param1); + + struct raft_change *req2 = raft_malloc(sizeof(*req2)); + r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL); + if (r != 0) { + printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft)); + } + */ + + } else if (strcmp(cmd, "dropnode") == 0) { + printf("not support \n"); + + } else if (strcmp(cmd, "rebalance") == 0) { + + /* + updateLeaderStates(pRaftServer); + + int myLeaderCount; + for (int i = 0; i < NODE_COUNT; ++i) { + if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { + myLeaderCount = leaderStates[i].leaderCount; + } + } + + while (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { + printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); + + struct raft *r; + for (int j = 0; j < pRaftServer->instanceCount; ++j) { + if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { + r = &pRaftServer->instance[j].raft; + } + } + + struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); + transfer->data = pRaftServer; + + uint64_t destRaftId; + int minIndex = -1; + int minLeaderCount = myLeaderCount; + for (int j = 0; j < NODE_COUNT; ++j) { + if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue; + + printf("-----leaderStates[%d].leaderCount:%d \n", j, leaderStates[j].leaderCount); + if (leaderStates[j].leaderCount <= minLeaderCount) { + minIndex = j; + printf("++++ assign minIndex : %d \n", minIndex); + } + } + + printf("minIndex:%d minLeaderCount:%d \n", minIndex, minLeaderCount); + + char myHost[48]; + uint16_t myPort; + uint16_t myVid; + decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); + + char *destAddress = leaderStates[minIndex].address; + + char tokens[MAX_PEERS][MAX_TOKEN_LEN]; + splitString(destAddress, ":", tokens, 2); + char *destHost = tokens[0]; + uint16_t destPort = atoi(tokens[1]); + destRaftId = encodeRaftId(destHost, destPort, myVid); + + printf("destHost:%s destPort:%u myVid:%u", destHost, destPort, myVid); + raft_transfer(r, transfer, destRaftId, raftTransferCb); + sleep(1); + + for (int i = 0; i < NODE_COUNT; ++i) { + if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { + myLeaderCount = leaderStates[i].leaderCount; + } + } + } + */ + + + int leaderCount = 0; + + struct raft *firstR; + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + struct raft *r = &pRaftServer->instance[i].raft; + if (r->state == RAFT_LEADER) { + leaderCount++; + firstR = r; + } + } + + if (leaderCount > pRaftServer->instanceCount / NODE_COUNT) { + struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); + transfer->data = pRaftServer; + raft_transfer(firstR, transfer, 0, raftTransferCb); + } + + + } else if (strcmp(cmd, "put") == 0) { + char buf[256]; + uint16_t vid; + sscanf(param1, "%hu", &vid); + snprintf(buf, sizeof(buf), "%s--%s", param2, param3); + putValue(&pRaftServer->instance[vid].raft, buf); + + } else if (strcmp(cmd, "get") == 0) { + getValue(param1); + + } else if (strcmp(cmd, "transfer") == 0) { + uint16_t vid; + sscanf(param1, "%hu", &vid); + + struct raft_transfer transfer; + raft_transfer(&pRaftServer->instance[vid].raft, &transfer, 0, NULL); + + + } else if (strcmp(cmd, "state") == 0) { + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + printf("instance %d: ", i); + printRaftState(&pRaftServer->instance[i].raft); + } + + } else if (strcmp(cmd, "state2") == 0) { + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + printRaftState2(&pRaftServer->instance[i].raft); + } + + } else if (strcmp(cmd, "snapshot") == 0) { + printf("not support \n"); + + } else if (strcmp(cmd, "help") == 0) { + printf("addnode \"127.0.0.1:8888\" \n"); + printf("dropnode \"127.0.0.1:8888\" \n"); + printf("put key value \n"); + printf("get key \n"); + printf("state \n"); + + } else { + printf("unknown command: [%s], type \"help\" to see help \n", cmd); + } + + //printf("cmd_buf: [%s] \n", cmd_buf); + } +} + +void *startConsoleFunc(void *param) { + SRaftServer *pServer = (SRaftServer*)param; + console(pServer); + return NULL; +} + +// Config --------------------------------- +void usage() { + printf("\nusage: \n"); + printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name); + printf("\n"); + printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name); + printf("\n"); +} + +void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { + memset(pConf, 0, sizeof(*pConf)); + + int option_index, option_value; + option_index = 0; + static struct option long_options[] = { + {"help", no_argument, NULL, 'h'}, + {"peers", required_argument, NULL, 'p'}, + {"me", required_argument, NULL, 'm'}, + {"dir", required_argument, NULL, 'd'}, + {NULL, 0, NULL, 0} + }; + + while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) { + switch (option_value) { + case 'm': { + parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); + break; + } + + case 'p': { + char tokens[MAX_PEERS][MAX_TOKEN_LEN]; + int peerCount = splitString(optarg, ",", tokens, MAX_PEERS); + pConf->peersCount = peerCount; + for (int i = 0; i < peerCount; ++i) { + Addr *pAddr = &pConf->peers[i]; + parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port); + } + break; + } + + + case 'd': { + snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg); + break; + } + + case 'h': { + usage(); + exit(-1); + } + + default: { + usage(); + exit(-1); + } + } + } + snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s_%u", pConf->dir, pConf->me.host, pConf->me.port); +} + +void printConf(SRaftServerConfig *pConf) { + printf("\nconf: \n"); + printf("me: %s:%u \n", pConf->me.host, pConf->me.port); + printf("peersCount: %d \n", pConf->peersCount); + for (int i = 0; i < pConf->peersCount; ++i) { + Addr *pAddr = &pConf->peers[i]; + printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port); + } + printf("dataDir: %s \n\n", pConf->dataDir); + +} + + +int main(int argc, char **argv) { + srand(time(NULL)); + int32_t ret; + + exe_name = argv[0]; + if (argc < 3) { + usage(); + exit(-1); + } + + SRaftServerConfig conf; + parseConf(argc, argv, &conf); + printConf(&conf); + + signal(SIGPIPE, SIG_IGN); + + /* + char cmd_buf[COMMAND_LEN]; + snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir); + system(cmd_buf); + */ + + + struct raft_fsm fsm; + initFsm(&fsm); + + SRaftServer raftServer; + ret = raftServerInit(&raftServer, &conf, &fsm); + assert(ret == 0); + + pthread_t tidRaftServer; + pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer); + + pthread_t tidConsole; + pthread_create(&tidConsole, NULL, startConsoleFunc, &raftServer); + + while (1) { + sleep(10); + } + + return 0; +} diff --git a/contrib/test/traft/raftServer.c b/contrib/test/traft/raftServer.c new file mode 100644 index 0000000000..94de49cd0f --- /dev/null +++ b/contrib/test/traft/raftServer.c @@ -0,0 +1,222 @@ +#include +#include +#include "common.h" +#include "raftServer.h" + +char *keys; +char *values; + +void initStore() { + keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + writeIndex = 0; +} + +void destroyStore() { + free(keys); + free(values); +} + +void putKV(const char *key, const char *value) { + if (writeIndex < MAX_RECORD_COUNT) { + strncpy(&keys[writeIndex], key, MAX_KV_LEN); + strncpy(&values[writeIndex], value, MAX_KV_LEN); + writeIndex++; + } +} + +char *getKV(const char *key) { + for (int i = 0; i < MAX_RECORD_COUNT; ++i) { + if (strcmp(&keys[i], key) == 0) { + return &values[i]; + } + } + return NULL; +} + + +int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr) +{ + if (n_arr <= 0) { + return -1; + } + + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + char* context; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + strncpy(arr[n], token, MAX_TOKEN_LEN); + n++; + + while (1) { + token = strtok_r(NULL, separator, &context); + if (!token || n >= n_arr) { + goto ret; + } + strncpy(arr[n], token, MAX_TOKEN_LEN); + n++; + } + +ret: + free(tmp); + return n; +} + +/* +uint64_t raftId(const char *host, uint32_t port) { + uint32_t host_uint32 = (uint32_t)inet_addr(host); + assert(host_uint32 != (uint32_t)-1); + uint64_t code = ((uint64_t)host_uint32) << 32 | port; + return code; +} +*/ + + +/* +uint64_t encodeRaftId(const char *host, uint16_t port, uint16_t vid) { + uint64_t raftId; + uint32_t host_uint32 = (uint32_t)inet_addr(host); + assert(host_uint32 != (uint32_t)-1); + + raftId = (((uint64_t)host_uint32) << 32) | (((uint32_t)port) << 16) | vid; + return raftId; +} + +void decodeRaftId(uint64_t raftId, char *host, int32_t len, uint16_t *port, uint16_t *vid) { + uint32_t host32 = (uint32_t)((raftId >> 32) & 0x00000000FFFFFFFF); + + struct in_addr addr; + addr.s_addr = host32; + snprintf(host, len, "%s", inet_ntoa(addr)); + + *port = (uint16_t)((raftId & 0x00000000FFFF0000) >> 16); + *vid = (uint16_t)(raftId & 0x000000000000FFFF); +} +*/ + + + + +int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) { + int ret; + + snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host); + pRaftServer->port = pConf->me.port; + snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port); + //strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir)); + + ret = uv_loop_init(&pRaftServer->loop); + if (ret != 0) { + fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret)); + assert(0); + } + + ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); + if (ret != 0) { + fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret); + assert(0); + } + + + uint16_t vid; + pRaftServer->instanceCount = 20; + + + for (int i = 0; i < pRaftServer->instanceCount; ++i) + { + //vid = 0; + vid = i; + + + pRaftServer->instance[vid].raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); + snprintf(pRaftServer->instance[vid].dir, sizeof(pRaftServer->instance[vid].dir), "%s_%llu", pConf->dataDir, pRaftServer->instance[vid].raftId); + + char cmd_buf[COMMAND_LEN]; + snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", pRaftServer->instance[vid].dir); + system(cmd_buf); + sleep(1); + + pRaftServer->instance[vid].fsm = pFsm; + + ret = raft_uv_init(&pRaftServer->instance[vid].io, &pRaftServer->loop, pRaftServer->instance[vid].dir, &pRaftServer->transport); + if (ret != 0) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft)); + assert(0); + } + + ret = raft_init(&pRaftServer->instance[vid].raft, &pRaftServer->instance[vid].io, pRaftServer->instance[vid].fsm, pRaftServer->instance[vid].raftId, pRaftServer->address); + if (ret != 0) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft)); + assert(0); + } + + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, pRaftServer->instance[vid].raftId, pRaftServer->address, RAFT_VOTER); + printf("add myself: %llu - %s \n", pRaftServer->instance[vid].raftId, pRaftServer->address); + for (int i = 0; i < pConf->peersCount; ++i) { + const Addr *pAddr = &pConf->peers[i]; + + raft_id rid = encodeRaftId(pAddr->host, pAddr->port, vid); + + char addrBuf[ADDRESS_LEN]; + snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port); + raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER); + printf("add peers: %llu - %s \n", rid, addrBuf); + } + + raft_bootstrap(&pRaftServer->instance[vid].raft, &conf); + + } + + + + + + + + return 0; +} + +int32_t raftServerStart(SRaftServer *pRaftServer) { + int ret; + + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + ret = raft_start(&pRaftServer->instance[i].raft); + if (ret != 0) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[i].raft)); + } + + } + + + uv_run(&pRaftServer->loop, UV_RUN_DEFAULT); +} + + +void raftServerClose(SRaftServer *pRaftServer) { + +} + + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { + char *msg = (char*)buf->base; + printf("fsm apply: %s \n", msg); + + char arr[2][MAX_TOKEN_LEN]; + splitString(msg, "--", arr, 2); + putKV(arr[0], arr[1]); + + return 0; +} + +int32_t initFsm(struct raft_fsm *fsm) { + initStore(); + fsm->apply = fsmApplyCb; + return 0; +} diff --git a/contrib/test/traft/raftServer.h b/contrib/test/traft/raftServer.h new file mode 100644 index 0000000000..b1f62caac5 --- /dev/null +++ b/contrib/test/traft/raftServer.h @@ -0,0 +1,68 @@ +#ifndef TDENGINE_RAFT_SERVER_H +#define TDENGINE_RAFT_SERVER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include "raft.h" +#include "raft/uv.h" +#include "common.h" + + +// simulate a db store, just for test +#define MAX_KV_LEN 100 +#define MAX_RECORD_COUNT 500 +char *keys; +char *values; +int writeIndex; + +void initStore(); +void destroyStore(); +void putKV(const char *key, const char *value); +char *getKV(const char *key); + +typedef struct { + char dir[DIR_LEN + HOST_LEN * 4]; /* Data dir of UV I/O backend */ + raft_id raftId; /* For vote */ + struct raft_fsm *fsm; /* Sample application FSM */ + struct raft raft; /* Raft instance */ + struct raft_io io; /* UV I/O backend */ + +} SInstance; + +typedef struct { + char host[HOST_LEN]; + uint32_t port; + char address[ADDRESS_LEN]; /* Raft instance address */ + + struct uv_loop_s loop; /* UV loop */ + struct raft_uv_transport transport; /* UV I/O backend transport */ + + SInstance instance[MAX_INSTANCE_NUM]; + int32_t instanceCount; + +} SRaftServer; + +#define MAX_TOKEN_LEN 32 +int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr); + +int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm); +int32_t raftServerStart(SRaftServer *pRaftServer); +void raftServerClose(SRaftServer *pRaftServer); + + +int initFsm(struct raft_fsm *fsm); + + + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RAFT_SERVER_H -- GitLab