raftServer.c 3.8 KB
Newer Older
M
Minghao Li 已提交
1 2
#include <stdlib.h>
#include "common.h"
M
Minghao Li 已提交
3 4
#include "raftServer.h"

5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
char *keys;
char *values;

void initStore() {
	keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
	values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
	writeIndex = 0;
}

void destroyStore() {
	free(keys);
	free(values);
}

void putKV(const char *key, const char *value) {
	if (writeIndex < MAX_RECORD_COUNT) {
		strncpy(&keys[writeIndex], key, MAX_KV_LEN);
		strncpy(&values[writeIndex], value, MAX_KV_LEN);
		writeIndex++;
	}
}

char *getKV(const char *key) {
	for (int i = 0; i < MAX_RECORD_COUNT; ++i) {
		if (strcmp(&keys[i], key) == 0) {
			return &values[i];
		}
	}
	return NULL;
}


int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr)
{
    if (n_arr <= 0) {
    	return -1;
    }

    char* tmp = (char*)malloc(strlen(str) + 1);
    strcpy(tmp, str);
    char* context;
    int n = 0;

    char* token = strtok_r(tmp, separator, &context);
    if (!token) {
    	goto ret;
    }
    strncpy(arr[n], token, MAX_TOKEN_LEN);
    n++;

    while (1) {
    token = strtok_r(NULL, separator, &context);
    	if (!token || n >= n_arr) {
    	    goto ret;
    	}
    	strncpy(arr[n], token, MAX_TOKEN_LEN);
    	n++;
    }

ret:
    free(tmp);
    return n;
}

M
Minghao Li 已提交
69 70 71 72 73 74 75
uint64_t raftId(const char *host, uint32_t port) {
    uint32_t host_uint32 = (uint32_t)inet_addr(host);
    assert(host_uint32 != (uint32_t)-1);
    uint64_t code = ((uint64_t)host_uint32) << 32 | port;
	return code;
}

76
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) {
M
Minghao Li 已提交
77 78
	int ret;

79 80 81 82
    snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host);
	pRaftServer->port = pConf->me.port;
    snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port);
	strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir));
M
Minghao Li 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96

	pRaftServer->raftId = raftId(pRaftServer->host, pRaftServer->port);
	pRaftServer->fsm = pFsm;

	ret = uv_loop_init(&pRaftServer->loop);
	if (!ret) {
		fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
	}

	ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
	if (!ret) {
		fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
	}

97
	ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport);
M
Minghao Li 已提交
98 99 100 101 102 103 104 105 106 107 108 109
	if (!ret) {
		fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
	}

	ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address);
	if (!ret) {
		fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
	}

    struct raft_configuration conf;
    raft_configuration_init(&conf);
    raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER);
110 111 112 113 114 115 116 117 118 119
	printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address);
	for (int i = 0; i < pConf->peersCount; ++i) {
		const Addr *pAddr = &pConf->peers[i];
		raft_id rid = raftId(pAddr->host, pAddr->port);
		char addrBuf[ADDRESS_LEN];
		snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port);
		raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER);
		printf("add peers: %llu - %s \n", rid, addrBuf);
	}

M
Minghao Li 已提交
120
    raft_bootstrap(&pRaftServer->raft, &conf);
M
Minghao Li 已提交
121 122 123 124 125

	return 0;
}

int32_t raftServerStart(SRaftServer *pRaftServer) {
M
Minghao Li 已提交
126 127 128 129 130
	int ret;
	ret = raft_start(&pRaftServer->raft);
	if (!ret) {
		fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
	}
M
Minghao Li 已提交
131

M
Minghao Li 已提交
132
	uv_run(&pRaftServer->loop, UV_RUN_DEFAULT);
M
Minghao Li 已提交
133 134 135 136 137 138 139 140
}


void raftServerClose(SRaftServer *pRaftServer) {

}


M
Minghao Li 已提交
141 142
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) {
    char *msg = (char*)buf->base;
143 144 145 146 147
    printf("fsm apply: %s \n", msg);

	char arr[2][MAX_TOKEN_LEN];
	splitString(msg, "--", arr, 2);
	putKV(arr[0], arr[1]);
M
Minghao Li 已提交
148 149 150 151 152

    return 0;
}

int32_t initFsm(struct raft_fsm *fsm) {
153
	initStore();
M
Minghao Li 已提交
154
	fsm->apply = fsmApplyCb;
M
Minghao Li 已提交
155 156
	return 0;
}