提交 3f3fb4a2 编写于 作者: M Minghao Li

add node by manual operation

上级 829fd712
......@@ -20,6 +20,7 @@ typedef struct {
} Addr;
typedef struct {
int voter;
Addr me;
Addr peers[MAX_PEERS];
int peersCount;
......
......@@ -104,7 +104,7 @@ const char* state2String(unsigned short state) {
void printRaftConfiguration(struct raft_configuration *c) {
printf("configuration: \n");
for (int i = 0; i < c->n; ++i) {
printf("%llu -- %d -- %s\n", c->servers->id, c->servers->role, c->servers->address);
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
}
}
......@@ -119,11 +119,9 @@ void printRaftState(struct raft *r) {
printf("last_applied: %llu \n", r->last_applied);
printf("last_stored: %llu \n", r->last_stored);
/*
printf("configuration_index: %llu \n", r->configuration_index);
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
printRaftConfiguration(&r->configuration);
*/
printf("----------------------------\n");
}
......@@ -164,6 +162,18 @@ void getValue(const char *key) {
}
}
void raft_change_cb_add(struct raft_change *req, int status) {
printf("raft_change_cb_add status:%d ... \n", status);
}
void raft_change_cb_assign(struct raft_change *req, int status) {
printf("raft_change_cb_assign status:%d ... \n", status);
}
void raft_change_cb_remove(struct raft_change *req, int status) {
printf("raft_change_cb_remove status:%d ... \n", status);
}
void console(SRaftServer *pRaftServer) {
while (1) {
char cmd_buf[COMMAND_LEN];
......@@ -193,30 +203,59 @@ void console(SRaftServer *pRaftServer) {
parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN);
if (strcmp(cmd, "addnode") == 0) {
printf("not support \n");
//printf("not support \n");
/*
char host[HOST_LEN];
uint32_t port;
parseAddr(param1, host, HOST_LEN, &port);
uint64_t rid = raftId(host, port);
struct raft_change *req = raft_malloc(sizeof(*req));
int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL);
int r = raft_add(&pRaftServer->raft, req, rid, param1, raft_change_cb_add);
if (r != 0) {
printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft));
printf("raft_add error: %s \n", raft_errmsg(&pRaftServer->raft));
}
printf("add node: %lu %s \n", rid, param1);
struct raft_change *req2 = raft_malloc(sizeof(*req2));
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL);
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign);
if (r != 0) {
printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft));
}
printf("raft_assign: %s %d \n", param1, RAFT_VOTER);
} else if (strcmp(cmd, "activate") == 0) {
char host[HOST_LEN];
uint32_t port;
parseAddr(param1, host, HOST_LEN, &port);
uint64_t rid = raftId(host, port);
struct raft_change *req2 = raft_malloc(sizeof(*req2));
int r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign);
if (r != 0) {
printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft));
printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft));
}
*/
printf("raft_assign: %s %d \n", param1, RAFT_VOTER);
} else if (strcmp(cmd, "dropnode") == 0) {
printf("not support \n");
char host[HOST_LEN];
uint32_t port;
parseAddr(param1, host, HOST_LEN, &port);
uint64_t rid = raftId(host, port);
struct raft_change *req = raft_malloc(sizeof(*req));
int r = raft_remove(&pRaftServer->raft, req, rid, raft_change_cb_remove);
if (r != 0) {
printf("raft_remove: %s \n", raft_errmsg(&pRaftServer->raft));
}
printf("drop node: %lu %s \n", rid, param1);
} else if (strcmp(cmd, "put") == 0) {
char buf[256];
......@@ -234,6 +273,7 @@ void console(SRaftServer *pRaftServer) {
} else if (strcmp(cmd, "help") == 0) {
printf("addnode \"127.0.0.1:8888\" \n");
printf("activate \"127.0.0.1:8888\" \n");
printf("dropnode \"127.0.0.1:8888\" \n");
printf("put key value \n");
printf("get key \n");
......@@ -256,7 +296,9 @@ void *startConsoleFunc(void *param) {
// Config ---------------------------------
void usage() {
printf("\nusage: \n");
printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name);
printf("%s --me=127.0.0.1:10000 --dir=./data --voter \n", exe_name);
printf("%s --me=127.0.0.1:10001 --dir=./data \n", exe_name);
printf("%s --me=127.0.0.1:10002 --dir=./data \n", exe_name);
printf("\n");
printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name);
printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name);
......@@ -271,13 +313,15 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
option_index = 0;
static struct option long_options[] = {
{"help", no_argument, NULL, 'h'},
{"voter", no_argument, NULL, 'v'},
{"peers", required_argument, NULL, 'p'},
{"me", required_argument, NULL, 'm'},
{"dir", required_argument, NULL, 'd'},
{NULL, 0, NULL, 0}
};
while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) {
pConf->voter = 0;
while ((option_value = getopt_long(argc, argv, "hvp:m:d:", long_options, &option_index)) != -1) {
switch (option_value) {
case 'm': {
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
......@@ -295,6 +339,10 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
break;
}
case 'v': {
pConf->voter = 1;
break;
}
case 'd': {
snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg);
......@@ -338,6 +386,8 @@ int main(int argc, char **argv) {
exit(-1);
}
signal(SIGPIPE, SIG_IGN);
SRaftServerConfig conf;
parseConf(argc, argv, &conf);
printConf(&conf);
......
......@@ -85,29 +85,45 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf,
pRaftServer->fsm = pFsm;
ret = uv_loop_init(&pRaftServer->loop);
if (!ret) {
if (ret != 0) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
assert(0);
}
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
if (!ret) {
if (ret != 0) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
assert(0);
}
ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport);
if (!ret) {
if (ret != 0) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
assert(0);
}
ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address);
if (!ret) {
if (ret != 0) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
assert(0);
}
struct raft_configuration conf;
raft_configuration_init(&conf);
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER);
if (pConf->voter == 0) {
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_SPARE);
} else {
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER);
}
printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address);
for (int i = 0; i < pConf->peersCount; ++i) {
const Addr *pAddr = &pConf->peers[i];
raft_id rid = raftId(pAddr->host, pAddr->port);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册