提交 fa7f441f 编写于 作者: M Minghao Li

sync refactor

上级 0c8f62f7
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
#include <tdatablock.h> #include <tdatablock.h>
#include "taosdef.h" #include "taosdef.h"
#include "trpc.h" #include "trpc.h"
#include "wal.h"
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
...@@ -93,19 +94,13 @@ typedef struct SSyncLogStore { ...@@ -93,19 +94,13 @@ typedef struct SSyncLogStore {
void* data; void* data;
// append one log entry // append one log entry
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf); int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pEntry);
// get one log entry, user need to free pBuf->data // get one log entry, user need to free pEntry->pCont
int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf); int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry);
// update log store commit index with "index" // truncate log with index, entries after the given index (>=index) will be deleted
int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
// truncate log with index, entries after the given index (>index) will be deleted
int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
// return index of last entry // return index of last entry
SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore);
...@@ -113,6 +108,12 @@ typedef struct SSyncLogStore { ...@@ -113,6 +108,12 @@ typedef struct SSyncLogStore {
// return term of last entry // return term of last entry
SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore); SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore);
// update log store commit index with "index"
int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
} SSyncLogStore; } SSyncLogStore;
// raft need to persist two variables in storage: currentTerm, voteFor // raft need to persist two variables in storage: currentTerm, voteFor
...@@ -134,7 +135,7 @@ typedef struct SSyncInfo { ...@@ -134,7 +135,7 @@ typedef struct SSyncInfo {
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
char walPath[TSDB_FILENAME_LEN]; SWal* pWal;
SSyncFSM* pFsm; SSyncFSM* pFsm;
void* rpcClient; void* rpcClient;
......
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h" #include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
// TLA+ Spec // TLA+ Spec
......
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h" #include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
// TLA+ Spec // TLA+ Spec
......
...@@ -23,7 +23,6 @@ extern "C" { ...@@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
#ifdef __cplusplus #ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_H
#define _TD_LIBS_SYNC_RAFT_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "sync.h"
#include "syncMessage.h"
#include "taosdef.h"
#if 0
typedef struct SRaftId {
SyncNodeId addr;
SyncGroupId vgId;
} SRaftId;
typedef struct SRaft {
SRaftId id;
SSyncFSM* pFsm;
int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg);
int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg);
int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg);
int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg);
int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg);
int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg);
int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg);
} SRaft;
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm);
void raftClose(SRaft* pRaft);
static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg);
static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg);
static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg);
static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg);
static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg);
static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg);
static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg);
static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg);
static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg);
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak);
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft);
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_RAFT_H*/
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "cJSON.h" #include "cJSON.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
#define RAFT_STORE_BLOCK_SIZE 512 #define RAFT_STORE_BLOCK_SIZE 512
......
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h" #include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
// TLA+ Spec // TLA+ Spec
......
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h" #include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
// TLA+ Spec // TLA+ Spec
......
...@@ -24,7 +24,6 @@ extern "C" { ...@@ -24,7 +24,6 @@ extern "C" {
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h" #include "syncInt.h"
#include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
......
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h" #include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
// TLA+ Spec // TLA+ Spec
......
...@@ -45,8 +45,8 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted); ...@@ -45,8 +45,8 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted);
bool voteGrantedMajority(SVotesGranted *pVotesGranted); bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted); cJSON * voteGranted2Json(SVotesGranted *pVotesGranted);
char *voteGranted2Str(SVotesGranted *pVotesGranted); char * voteGranted2Str(SVotesGranted *pVotesGranted);
// SVotesRespond ----------------------------- // SVotesRespond -----------------------------
typedef struct SVotesRespond { typedef struct SVotesRespond {
...@@ -62,8 +62,8 @@ void votesRespondDestory(SVotesRespond *pVotesRespond); ...@@ -62,8 +62,8 @@ void votesRespondDestory(SVotesRespond *pVotesRespond);
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
cJSON *votesRespond2Json(SVotesRespond *pVotesRespond); cJSON * votesRespond2Json(SVotesRespond *pVotesRespond);
char *votesRespond2Str(SVotesRespond *pVotesRespond); char * votesRespond2Str(SVotesRespond *pVotesRespond);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncIO.h" #include "syncIO.h"
#include <tdatablock.h> #include <tdatablock.h>
#include "syncOnMessage.h" #include "syncMessage.h"
#include "tglobal.h" #include "tglobal.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "syncEnv.h" #include "syncEnv.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaft.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncRequestVote.h" #include "syncRequestVote.h"
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "tcoding.h" #include "tcoding.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncRaft.h"
#include "sync.h"
#if 0
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) {
SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft));
assert(pRaft != NULL);
pRaft->id = raftId;
pRaft->pFsm = pFsm;
pRaft->FpPing = doRaftPing;
pRaft->FpOnPing = onRaftPing;
pRaft->FpOnPingReply = onRaftPingReply;
pRaft->FpRequestVote = doRaftRequestVote;
pRaft->FpOnRequestVote = onRaftRequestVote;
pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply;
pRaft->FpAppendEntries = doRaftAppendEntries;
pRaft->FpOnAppendEntries = onRaftAppendEntries;
pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply;
return pRaft;
}
void raftClose(SRaft* pRaft) {
assert(pRaft != NULL);
free(pRaft);
}
static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; }
static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; }
static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; }
static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; }
static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; }
static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; }
static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; }
static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; }
static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; }
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; }
#endif
\ No newline at end of file
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "syncSnapshot.h" #include "syncSnapshot.h"
#include "syncRaft.h"
int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "syncEnv.h" #include "syncEnv.h"
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
void logTest() { void logTest() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册