From 0c8f62f701d263be26c84a30c0fbee300af39ba7 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 14:51:02 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/inc/syncRaftEntry.h | 12 ++- source/libs/sync/inc/syncRaftLog.h | 37 +++++--- source/libs/sync/inc/syncVoteMgr.h | 8 +- source/libs/sync/src/syncMain.c | 25 ++++-- source/libs/sync/src/syncRaftLog.c | 89 +++++++++++-------- source/libs/sync/test/syncIndexMgrTest.cpp | 1 - source/libs/sync/test/syncInitTest.cpp | 1 - source/libs/sync/test/syncPingTest.cpp | 1 - .../libs/sync/test/syncVotesGrantedTest.cpp | 1 - .../libs/sync/test/syncVotesRespondTest.cpp | 1 - 11 files changed, 108 insertions(+), 70 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8b77e292c4..1ca705441d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -116,7 +116,7 @@ typedef struct SSyncNode { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - char walPath[TSDB_FILENAME_LEN]; + SWal* pWal; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); void* queue; diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 516bef4d48..9cc05d44a9 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -27,10 +27,14 @@ extern "C" { #include "taosdef.h" typedef struct SSyncRaftEntry { - SyncTerm term; - SyncIndex index; - SSyncBuffer data; - int8_t flag; + uint32_t bytes; + uint32_t msgType; + SyncTerm term; + SyncIndex index; + int8_t flag; + uint32_t dataLen; + char data[]; + } SSyncRaftEntry; #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index ee971062cf..8205f19d91 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -24,27 +24,42 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncRaftEntry.h" #include "taosdef.h" -int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); +typedef struct SSyncLogStoreData { + SSyncNode* pSyncNode; + SWal* pWal; +} SSyncLogStoreData; -// get one log entry, user need to free pBuf->data -int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf); +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); -// update log store commit index with "index" -int32_t raftLogUpdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index); +void logStoreDestory(SSyncLogStore* pLogStore); -// truncate log with index, entries after the given index (>index) will be deleted -int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index); +// append one log entry +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SRpcMsg* pEntry); -// return commit index of log -SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore); +// get one log entry, user need to free pEntry->pCont +int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry); + +// truncate log with index, entries after the given index (>=index) will be deleted +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); // return index of last entry -SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore); +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); // return term of last entry -SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore); +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); + +// update log store commit index with "index" +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); + +// return commit index of log +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); + +cJSON* logStore2Json(SSyncLogStore* pLogStore); + +char* logStore2Str(SSyncLogStore* pLogStore); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index ae9cfe8d01..d437e459b9 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -45,8 +45,8 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted); bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); -cJSON * voteGranted2Json(SVotesGranted *pVotesGranted); -char * voteGranted2Str(SVotesGranted *pVotesGranted); +cJSON *voteGranted2Json(SVotesGranted *pVotesGranted); +char *voteGranted2Str(SVotesGranted *pVotesGranted); // SVotesRespond ----------------------------- typedef struct SVotesRespond { @@ -62,8 +62,8 @@ void votesRespondDestory(SVotesRespond *pVotesRespond); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); -cJSON * votesRespond2Json(SVotesRespond *pVotesRespond); -char * votesRespond2Str(SVotesRespond *pVotesRespond); +cJSON *votesRespond2Json(SVotesRespond *pVotesRespond); +char *votesRespond2Str(SVotesRespond *pVotesRespond); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3b8d716dbe..a7663be3a5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -18,8 +18,10 @@ #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" #include "syncEnv.h" +#include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncRequestVote.h" #include "syncRequestVoteReply.h" @@ -78,7 +80,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->syncCfg = pSyncInfo->syncCfg; memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); - memcpy(pSyncNode->walPath, pSyncInfo->walPath, sizeof(pSyncNode->walPath)); + pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->queue = pSyncInfo->queue; @@ -114,20 +116,26 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init life cycle - // init server vars + // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - pSyncNode->pRaftStore = raftStoreOpen(pSyncInfo->walPath); assert(pSyncNode->pRaftStore != NULL); - // init candidate vars + // init TLA+ candidate vars pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); assert(pSyncNode->pVotesGranted != NULL); pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); assert(pSyncNode->pVotesRespond != NULL); - // init leader vars - pSyncNode->pNextIndex = NULL; - pSyncNode->pMatchIndex = NULL; + // init TLA+ leader vars + pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); + assert(pSyncNode->pNextIndex != NULL); + pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); + assert(pSyncNode->pMatchIndex != NULL); + + // init TLA+ log vars + pSyncNode->pLogStore = logStoreCreate(pSyncNode); + assert(pSyncNode->pLogStore != NULL); + pSyncNode->commitIndex = 0; // init ping timer pSyncNode->pPingTimer = NULL; @@ -177,7 +185,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // init by SSyncInfo cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId); cJSON_AddStringToObject(pRoot, "path", pSyncNode->path); - cJSON_AddStringToObject(pRoot, "walPath", pSyncNode->walPath); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index e467057c8f..d9b053b42d 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -14,46 +14,61 @@ */ #include "syncRaftLog.h" +#include "wal.h" -int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } - -// get one log entry, user need to free pBuf->data -int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf) { return 0; } - -// TLA+ Spec -// \* Leader i advances its commitIndex. -// \* This is done as a separate step from handling AppendEntries responses, -// \* in part to minimize atomic regions, and in part so that leaders of -// \* single-server clusters are able to mark entries committed. -// AdvanceCommitIndex(i) == -// /\ state[i] = Leader -// /\ LET \* The set of servers that agree up through index. -// Agree(index) == {i} \cup {k \in Server : -// matchIndex[i][k] >= index} -// \* The maximum indexes for which a quorum agrees -// agreeIndexes == {index \in 1..Len(log[i]) : -// Agree(index) \in Quorum} -// \* New value for commitIndex'[i] -// newCommitIndex == -// IF /\ agreeIndexes /= {} -// /\ log[i][Max(agreeIndexes)].term = currentTerm[i] -// THEN -// Max(agreeIndexes) -// ELSE -// commitIndex[i] -// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] -// /\ UNCHANGED <> -// -int32_t raftLogupdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; } - -// truncate log with index, entries after the given index (>index) will be deleted -int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; } +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { + SSyncLogStore* pLogStore = malloc(sizeof(SSyncLogStore)); + assert(pLogStore != NULL); -// return commit index of log -SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore) { return 0; } + pLogStore->data = malloc(sizeof(SSyncLogStoreData)); + assert(pLogStore->data != NULL); + + SSyncLogStoreData* pData = pLogStore->data; + pData->pSyncNode = pSyncNode; + pData->pWal = pSyncNode->pWal; + + pLogStore->appendEntry = logStoreAppendEntry; + pLogStore->getEntry = logStoreGetEntry; + pLogStore->truncate = logStoreTruncate; + pLogStore->getLastIndex = logStoreLastIndex; + pLogStore->getLastTerm = logStoreLastTerm; + pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; + pLogStore->getCommitIndex = logStoreGetCommitIndex; +} + +void logStoreDestory(SSyncLogStore* pLogStore) { + if (pLogStore != NULL) { + free(pLogStore->data); + free(pLogStore); + } +} + +// append one log entry +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SRpcMsg* pEntry) {} + +// get one log entry, user need to free pEntry->pCont +int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry) {} + +// truncate log with index, entries after the given index (>=index) will be deleted +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {} // return index of last entry -SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore) { return 0; } +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {} // return term of last entry -SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore) { return 0; } \ No newline at end of file +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {} + +// update log store commit index with "index" +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {} + +// return commit index of log +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {} + +cJSON* logStore2Json(SSyncLogStore* pLogStore) {} + +char* logStore2Str(SSyncLogStore* pLogStore) { + cJSON* pJson = logStore2Json(pLogStore); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} \ No newline at end of file diff --git a/source/libs/sync/test/syncIndexMgrTest.cpp b/source/libs/sync/test/syncIndexMgrTest.cpp index 4e4cd9222b..9eb7b22b8e 100644 --- a/source/libs/sync/test/syncIndexMgrTest.cpp +++ b/source/libs/sync/test/syncIndexMgrTest.cpp @@ -34,7 +34,6 @@ SSyncNode* syncNodeInit() { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index 669c4e68a5..4aac24487e 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -31,7 +31,6 @@ SSyncNode* syncNodeInit() { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 450e097cc8..ae7977f270 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -26,7 +26,6 @@ SSyncNode* doSync(int myIndex) { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index 3edde509f8..a448ad44b6 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -33,7 +33,6 @@ SSyncNode* syncNodeInit() { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 74d42cd531..5cff8e0e26 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -33,7 +33,6 @@ SSyncNode* syncNodeInit() { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; -- GitLab