From 2ae6f747f918cecad4d6eeebbaaf35ad4c47ffea Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 8 Mar 2022 17:07:29 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncVoteMgr.h | 10 ++- source/libs/sync/src/syncElection.c | 17 ++++ source/libs/sync/src/syncMain.c | 4 +- source/libs/sync/src/syncVoteMgr.c | 70 +++++++++++++++- source/libs/sync/test/CMakeLists.txt | 14 ++++ .../libs/sync/test/syncVotesGrantedTest.cpp | 80 +++++++++++++++++++ 6 files changed, 188 insertions(+), 7 deletions(-) create mode 100644 source/libs/sync/test/syncVotesGrantedTest.cpp diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index e2307e9e66..a769bfbccd 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -28,10 +28,14 @@ extern "C" { #include "syncUtil.h" #include "taosdef.h" +// SVotesGranted ----------------------------- typedef struct SVotesGranted { + SRaftId (*replicas)[TSDB_MAX_REPLICA]; + int32_t replicaNum; + bool isGranted[TSDB_MAX_REPLICA]; + int32_t votes; SyncTerm term; int32_t quorum; - int32_t votes; bool toLeader; SSyncNode *pSyncNode; } SVotesGranted; @@ -41,7 +45,10 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted); bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); +cJSON * voteGranted2Json(SVotesGranted *pVotesGranted); +char * voteGranted2Str(SVotesGranted *pVotesGranted); +// SVotesRespond ----------------------------- typedef struct SVotesRespond { SRaftId (*replicas)[TSDB_MAX_REPLICA]; bool isRespond[TSDB_MAX_REPLICA]; @@ -51,6 +58,7 @@ typedef struct SVotesRespond { } SVotesRespond; SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); +void votesRespondDestory(SVotesRespond *pVotesRespond); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void Reset(SVotesRespond *pVotesRespond, SyncTerm term); diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 24f41a0e69..223431336e 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -15,6 +15,7 @@ #include "syncElection.h" #include "syncMessage.h" +#include "syncRaftStore.h" // TLA+ Spec // RequestVote(i, j) == @@ -29,11 +30,27 @@ // /\ UNCHANGED <> // int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SyncRequestVote* pMsg = syncRequestVoteBuild(); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = pSyncNode->peersId[i]; + pMsg->currentTerm = pSyncNode->pRaftStore->currentTerm; + pMsg->lastLogIndex = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); + pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore); + + ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); + assert(ret == 0); + syncRequestVoteDestroy(pMsg); + } return ret; } int32_t syncNodeElect(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + // start election int32_t ret = syncNodeRequestVotePeers(pSyncNode); return ret; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5338b5bd7f..ad424b9353 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -107,12 +107,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { syncUtilnodeInfo2raftId(&pSyncInfo->syncCfg.nodeInfo[i], pSyncInfo->vgId, &pSyncNode->replicasId[i]); } - // raft algorithm + // init raft algorithm pSyncNode->pFsm = pSyncInfo->pFsm; pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum); pSyncNode->leaderCache = EMPTY_RAFT_ID; - // life cycle + // init life cycle // init server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index c9f0ceab57..407b7b1941 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -14,15 +14,25 @@ */ #include "syncVoteMgr.h" +#include "syncUtil.h" + +// SVotesGranted ----------------------------- +static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { + memset(pVotesGranted->isGranted, 0, sizeof(pVotesGranted->isGranted)); + pVotesGranted->votes = 0; +} SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted)); assert(pVotesGranted != NULL); memset(pVotesGranted, 0, sizeof(SVotesGranted)); - pVotesGranted->quorum = pSyncNode->quorum; + pVotesGranted->replicas = &(pSyncNode->replicasId); + pVotesGranted->replicaNum = pSyncNode->replicaNum; + voteGrantedClearVotes(pVotesGranted); + pVotesGranted->term = 0; - pVotesGranted->votes = 0; + pVotesGranted->quorum = pSyncNode->quorum; pVotesGranted->toLeader = false; pVotesGranted->pSyncNode = pSyncNode; @@ -43,15 +53,61 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) { void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { assert(pMsg->voteGranted == true); assert(pMsg->term == pVotesGranted->term); - pVotesGranted->votes++; + assert(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)); + + int j = -1; + for (int i = 0; i < pVotesGranted->replicaNum; ++i) { + if (syncUtilSameId(&((*(pVotesGranted->replicas))[i]), &(pMsg->srcId))) { + j = i; + break; + } + } + assert(j != -1); + + if (pVotesGranted->isGranted[j] != true) { + ++(pVotesGranted->votes); + pVotesGranted->isGranted[j] = true; + } + assert(pVotesGranted->votes <= pVotesGranted->replicaNum); } void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { pVotesGranted->term = term; - pVotesGranted->votes = 0; + voteGrantedClearVotes(pVotesGranted); pVotesGranted->toLeader = false; } +cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { + char u64buf[128]; + cJSON *pRoot = cJSON_CreateObject(); + + cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum); + cJSON *pReplicas = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicas", pReplicas); + for (int i = 0; i < pVotesGranted->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas)[i]))); + } + cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes); + snprintf(u64buf, sizeof(u64buf), "%lu", pVotesGranted->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum); + cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader); + snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot); + return pJson; +} + +char *voteGranted2Str(SVotesGranted *pVotesGranted) { + cJSON *pJson = voteGranted2Json(pVotesGranted); + char * serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// SVotesRespond ----------------------------- SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond)); assert(pVotesRespond != NULL); @@ -65,6 +121,12 @@ SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { return pVotesRespond; } +void votesRespondDestory(SVotesRespond *pVotesRespond) { + if (pVotesRespond != NULL) { + free(pVotesRespond); + } +} + bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { bool ret = false; for (int i = 0; i < pVotesRespond->replicaNum; ++i) { diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index f7f0a0795c..18b7748105 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -12,6 +12,7 @@ add_executable(syncEnqTest "") add_executable(syncIndexTest "") add_executable(syncInitTest "") add_executable(syncUtilTest "") +add_executable(syncVotesGrantedTest "") target_sources(syncTest @@ -70,6 +71,10 @@ target_sources(syncUtilTest PRIVATE "syncUtilTest.cpp" ) +target_sources(syncVotesGrantedTest + PRIVATE + "syncVotesGrantedTest.cpp" +) target_include_directories(syncTest @@ -142,6 +147,11 @@ target_include_directories(syncUtilTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncVotesGrantedTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -200,6 +210,10 @@ target_link_libraries(syncUtilTest sync gtest_main ) +target_link_libraries(syncVotesGrantedTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp new file mode 100644 index 0000000000..97b807736b --- /dev/null +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -0,0 +1,80 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "syncVoteMgr.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[3] = {7010, 7110, 7210}; + +SSyncNode* doSync(int myIndex) { + SSyncFSM* pFsm; + + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = ports[0]; + snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = ports[1]; + snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = ports[2]; + snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + int myIndex = 0; + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + SSyncNode* pSyncNode = doSync(myIndex); + SVotesGranted* pVotesGranted = voteGrantedCreate(pSyncNode); + assert(pVotesGranted != NULL); + + char* serialized = voteGranted2Str(pVotesGranted); + assert(serialized != NULL); + printf("%s\n", serialized); + free(serialized); + + return 0; +} -- GitLab