提交 f7cd7554 编写于 作者: M Minghao Li

sync raft store

上级 6fcbca5b
......@@ -34,11 +34,11 @@ extern "C" {
typedef struct SSyncIO {
STaosQueue *pMsgQ;
STaosQset *pQset;
STaosQset * pQset;
pthread_t consumerTid;
void *serverRpc;
void *clientRpc;
void * serverRpc;
void * clientRpc;
SEpSet myAddr;
tmr_h qTimer;
......
......@@ -43,13 +43,14 @@ int32_t raftStorePersist(SRaftStore *pRaftStore);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
bool raftStoreHasVoted(SRaftStore *pRaftStore);
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
void raftStoreClearVote(SRaftStore *pRaftStore);
void raftStoreNextTerm(SRaftStore *pRaftStore);
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
cJSON *raftStore2Json(SRaftStore *pRaftStore);
char *raftStore2Str(SRaftStore *pRaftStore);
bool raftStoreHasVoted(SRaftStore *pRaftStore);
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
void raftStoreClearVote(SRaftStore *pRaftStore);
void raftStoreNextTerm(SRaftStore *pRaftStore);
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson);
cJSON * raftStore2Json(SRaftStore *pRaftStore);
char * raftStore2Str(SRaftStore *pRaftStore);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj);
......
......@@ -50,6 +50,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
}
int32_t syncNodeElect(SSyncNode* pSyncNode) {
int32_t ret = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollower2Candidate(pSyncNode);
}
......@@ -62,7 +63,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm);
syncNodeVoteForSelf(pSyncNode);
int32_t ret = syncNodeRequestVotePeers(pSyncNode);
ret = syncNodeRequestVotePeers(pSyncNode);
assert(ret == 0);
syncNodeResetElectTimer(pSyncNode);
......
......@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static void *syncIOConsumerFunc(void *param);
static void * syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
......@@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param;
SSyncIO * io = param;
STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg;
SRpcMsg * pRpcMsg, rpcMsg;
qall = taosAllocateQall();
while (1) {
......
......@@ -96,7 +96,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -103,6 +103,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode));
if (taosMkDir(pSyncInfo->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
return NULL;
}
// init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->syncCfg = pSyncInfo->syncCfg;
......@@ -200,6 +206,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
// start raft
syncNodeBecomeFollower(pSyncNode);
return pSyncNode;
}
......
......@@ -97,16 +97,32 @@ int32_t raftStorePersist(SRaftStore *pRaftStore) {
return 0;
}
static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; }
static bool raftStoreFileExist(char *path) {
bool b = taosStatFile(path, NULL, NULL) >= 0;
return b;
}
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
assert(pRaftStore != NULL);
cJSON *pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm);
cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr);
char u64Buf[128];
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "current_term", u64Buf);
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
uint64_t u64 = pRaftStore->voteFor.addr;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pRoot, "addr_host", host);
cJSON_AddNumberToObject(pRoot, "addr_port", port);
char *serialized = cJSON_Print(pRoot);
int len2 = strlen(serialized);
assert(len2 < len);
......@@ -125,10 +141,12 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pRoot = cJSON_Parse(buf);
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
pRaftStore->currentTerm = pCurrentTerm->valueint;
assert(cJSON_IsString(pCurrentTerm));
sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm));
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
pRaftStore->voteFor.addr = pVoteForAddr->valueint;
assert(cJSON_IsString(pVoteForAddr));
sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr));
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
......@@ -139,11 +157,10 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
bool raftStoreHasVoted(SRaftStore *pRaftStore) {
bool b = syncUtilEmptyId(&(pRaftStore->voteFor));
return b;
return (!b);
}
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) {
assert(!raftStoreHasVoted(pRaftStore));
assert(!syncUtilEmptyId(pRaftId));
pRaftStore->voteFor = *pRaftId;
raftStorePersist(pRaftStore);
......@@ -164,6 +181,8 @@ void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
raftStorePersist(pRaftStore);
}
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; }
cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char u64buf[128];
cJSON *pRoot = cJSON_CreateObject();
......@@ -185,6 +204,9 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
}
cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId);
cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor);
int hasVoted = raftStoreHasVoted(pRaftStore);
cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted);
}
cJSON *pJson = cJSON_CreateObject();
......
......@@ -116,7 +116,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char *voteGranted2Str(SVotesGranted *pVotesGranted) {
cJSON *pJson = voteGranted2Json(pVotesGranted);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -238,7 +238,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char *votesRespond2Str(SVotesRespond *pVotesRespond) {
cJSON *pJson = votesRespond2Json(pVotesRespond);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -27,6 +27,7 @@ add_executable(syncPingReplyTest "")
add_executable(syncRpcMsgTest "")
add_executable(syncPingTimerTest2 "")
add_executable(syncPingSelfTest "")
add_executable(syncElectTest "")
target_sources(syncTest
......@@ -145,6 +146,10 @@ target_sources(syncPingSelfTest
PRIVATE
"syncPingSelfTest.cpp"
)
target_sources(syncElectTest
PRIVATE
"syncElectTest.cpp"
)
target_include_directories(syncTest
......@@ -292,6 +297,16 @@ target_include_directories(syncPingSelfTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncElectTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncElectTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -410,6 +425,10 @@ target_link_libraries(syncPingSelfTest
sync
gtest_main
)
target_link_libraries(syncElectTest
sync
gtest_main
)
enable_testing()
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 1;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SWal* pWal;
SSyncNode* gSyncNode;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./elect_test");
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen("./elect_test_wal", &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
SSyncNode* syncInitTest() { return syncNodeInit(); }
void initRaftId(SSyncNode* pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
gSyncNode = syncInitTest();
assert(gSyncNode != NULL);
syncNodePrint2((char*)"", gSyncNode);
initRaftId(gSyncNode);
//---------------------------
while (1) {
sTrace("while 1 sleep, state: %d, %s", gSyncNode->state, syncUtilState2String(gSyncNode->state));
taosMsleep(1000);
}
return 0;
}
......@@ -3,6 +3,7 @@
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
......@@ -13,6 +14,21 @@ void logTest() {
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 5;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
void initRaftId() {
for (int i = 0; i < replicaNum; ++i) {
ids[i].addr = syncUtilAddr2U64("127.0.0.1", ports[i]);
ids[i].vgId = 1234;
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
......@@ -20,23 +36,35 @@ int main() {
logTest();
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
initRaftId();
SRaftStore* pRaftStore = raftStoreOpen("./test_raft_store.json");
assert(pRaftStore != NULL);
raftStorePrint(pRaftStore);
#if 0
pRaftStore->currentTerm = 100;
pRaftStore->voteFor.addr = 200;
pRaftStore->voteFor.vgId = 300;
raftStorePersist(pRaftStore);
raftStorePrint(pRaftStore);
#endif
++(pRaftStore->currentTerm);
++(pRaftStore->voteFor.addr);
++(pRaftStore->voteFor.vgId);
raftStorePersist(pRaftStore);
raftStorePrint(pRaftStore);
raftStorePrint2((char*)"==raftStoreOpen==", pRaftStore);
raftStoreSetTerm(pRaftStore, 100);
raftStorePrint2((char*)"==raftStoreSetTerm==", pRaftStore);
raftStoreVote(pRaftStore, &ids[0]);
raftStorePrint2((char*)"==raftStoreVote==", pRaftStore);
raftStoreClearVote(pRaftStore);
raftStorePrint2((char*)"==raftStoreClearVote==", pRaftStore);
raftStoreVote(pRaftStore, &ids[1]);
raftStorePrint2((char*)"==raftStoreVote==", pRaftStore);
raftStoreNextTerm(pRaftStore);
raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore);
raftStoreNextTerm(pRaftStore);
raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore);
raftStoreNextTerm(pRaftStore);
raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore);
raftStoreNextTerm(pRaftStore);
raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册