From fa7f441f425f47b0aeced5e656502f993877a55d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 15:07:43 +0800 Subject: [PATCH] sync refactor --- include/libs/sync/sync.h | 25 ++--- source/libs/sync/inc/syncAppendEntries.h | 1 - source/libs/sync/inc/syncAppendEntriesReply.h | 1 - source/libs/sync/inc/syncOnMessage.h | 1 - source/libs/sync/inc/syncRaft.h | 93 ------------------- source/libs/sync/inc/syncRaftStore.h | 1 - source/libs/sync/inc/syncRequestVote.h | 1 - source/libs/sync/inc/syncRequestVoteReply.h | 1 - source/libs/sync/inc/syncSnapshot.h | 1 - source/libs/sync/inc/syncTimeout.h | 1 - source/libs/sync/inc/syncVoteMgr.h | 8 +- source/libs/sync/src/syncIO.c | 2 +- source/libs/sync/src/syncMain.c | 1 - source/libs/sync/src/syncMessage.c | 1 - source/libs/sync/src/syncRaft.c | 70 -------------- source/libs/sync/src/syncSnapshot.c | 1 - source/libs/sync/test/syncEnqTest.cpp | 1 + 17 files changed, 19 insertions(+), 191 deletions(-) delete mode 100644 source/libs/sync/inc/syncRaft.h delete mode 100644 source/libs/sync/src/syncRaft.c diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ccbeb00bfd..c83082e3e4 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,6 +24,7 @@ extern "C" { #include #include "taosdef.h" #include "trpc.h" +#include "wal.h" typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -93,19 +94,13 @@ typedef struct SSyncLogStore { void* data; // append one log entry - int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf); + int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pEntry); - // get one log entry, user need to free pBuf->data - int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf); + // get one log entry, user need to free pEntry->pCont + int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry); - // update log store commit index with "index" - int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); - - // truncate log with index, entries after the given index (>index) will be deleted - int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex index); - - // return commit index of log - SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); + // truncate log with index, entries after the given index (>=index) will be deleted + int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); // return index of last entry SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore); @@ -113,6 +108,12 @@ typedef struct SSyncLogStore { // return term of last entry SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore); + // update log store commit index with "index" + int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); + + // return commit index of log + SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); + } SSyncLogStore; // raft need to persist two variables in storage: currentTerm, voteFor @@ -134,7 +135,7 @@ typedef struct SSyncInfo { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - char walPath[TSDB_FILENAME_LEN]; + SWal* pWal; SSyncFSM* pFsm; void* rpcClient; diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 35d3046d66..5999ef8300 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 75b82aa531..c0c1f76707 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncOnMessage.h b/source/libs/sync/inc/syncOnMessage.h index 8eae4fed4d..7cb186a812 100644 --- a/source/libs/sync/inc/syncOnMessage.h +++ b/source/libs/sync/inc/syncOnMessage.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncRaft.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h deleted file mode 100644 index bc5cf26a4c..0000000000 --- a/source/libs/sync/inc/syncRaft.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_RAFT_H -#define _TD_LIBS_SYNC_RAFT_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include -#include "sync.h" -#include "syncMessage.h" -#include "taosdef.h" - -#if 0 - -typedef struct SRaftId { - SyncNodeId addr; - SyncGroupId vgId; -} SRaftId; - -typedef struct SRaft { - SRaftId id; - SSyncFSM* pFsm; - - int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg); - - int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg); - - int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg); - - int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg); - - int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg); - - int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg); - - int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg); - - int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg); - - int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg); - -} SRaft; - -SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm); - -void raftClose(SRaft* pRaft); - -static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg); - -static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg); - -static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg); - -static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg); - -static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg); - -static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg); - -static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg); - -static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg); - -static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg); - -int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); - -static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); - -#endif - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_LIBS_SYNC_RAFT_H*/ diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 591a5b9963..1c25b799b4 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -25,7 +25,6 @@ extern "C" { #include #include "cJSON.h" #include "syncInt.h" -#include "syncRaft.h" #include "taosdef.h" #define RAFT_STORE_BLOCK_SIZE 512 diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 8bb4976de2..fd4ccd5371 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index ab9430b857..bcaf71a541 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 89fcb230fb..611f33a0f2 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -24,7 +24,6 @@ extern "C" { #include #include #include "syncInt.h" -#include "syncRaft.h" #include "taosdef.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index efd5aae48e..25c26c909d 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index d437e459b9..ae9cfe8d01 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -45,8 +45,8 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted); bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); -cJSON *voteGranted2Json(SVotesGranted *pVotesGranted); -char *voteGranted2Str(SVotesGranted *pVotesGranted); +cJSON * voteGranted2Json(SVotesGranted *pVotesGranted); +char * voteGranted2Str(SVotesGranted *pVotesGranted); // SVotesRespond ----------------------------- typedef struct SVotesRespond { @@ -62,8 +62,8 @@ void votesRespondDestory(SVotesRespond *pVotesRespond); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); -cJSON *votesRespond2Json(SVotesRespond *pVotesRespond); -char *votesRespond2Str(SVotesRespond *pVotesRespond); +cJSON * votesRespond2Json(SVotesRespond *pVotesRespond); +char * votesRespond2Str(SVotesRespond *pVotesRespond); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index d37c821a24..7edf561f5b 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -15,7 +15,7 @@ #include "syncIO.h" #include -#include "syncOnMessage.h" +#include "syncMessage.h" #include "tglobal.h" #include "ttimer.h" #include "tutil.h" diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a7663be3a5..4d19444abd 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -20,7 +20,6 @@ #include "syncEnv.h" #include "syncIndexMgr.h" #include "syncInt.h" -#include "syncRaft.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncRequestVote.h" diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 14f139a803..6732cd3958 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -14,7 +14,6 @@ */ #include "syncMessage.h" -#include "syncRaft.h" #include "syncUtil.h" #include "tcoding.h" diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c deleted file mode 100644 index b07c6ea797..0000000000 --- a/source/libs/sync/src/syncRaft.c +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "syncRaft.h" -#include "sync.h" - -#if 0 - -SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) { - SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); - assert(pRaft != NULL); - - pRaft->id = raftId; - pRaft->pFsm = pFsm; - - pRaft->FpPing = doRaftPing; - pRaft->FpOnPing = onRaftPing; - pRaft->FpOnPingReply = onRaftPingReply; - - pRaft->FpRequestVote = doRaftRequestVote; - pRaft->FpOnRequestVote = onRaftRequestVote; - pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply; - - pRaft->FpAppendEntries = doRaftAppendEntries; - pRaft->FpOnAppendEntries = onRaftAppendEntries; - pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply; - - return pRaft; -} - -void raftClose(SRaft* pRaft) { - assert(pRaft != NULL); - free(pRaft); -} - -static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; } - -static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; } - -static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; } - -static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; } - -static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; } - -static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; } - -static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; } - -static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; } - -static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; } - -int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } - -static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } - -#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index da194780ff..42b2bd993b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,7 +14,6 @@ */ #include "syncSnapshot.h" -#include "syncRaft.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index e2bc9a73ae..e1706bb40b 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -2,6 +2,7 @@ #include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" +#include "syncMessage.h" #include "syncRaftStore.h" void logTest() { -- GitLab