提交 e7775aa0 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

......@@ -26,7 +26,7 @@ extern "C" {
typedef int32_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef uint64_t SSyncTerm;
typedef uint64_t SyncTerm;
typedef enum {
TAOS_SYNC_ROLE_FOLLOWER = 0,
......@@ -61,13 +61,13 @@ typedef struct {
typedef struct SSyncFSM {
void* pData;
// apply committed log, bufs will be free by raft module
// apply committed log, bufs will be free by sync module
int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
// cluster commit callback
int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
// fsm return snapshot in ppBuf, bufs will be free by raft module
// fsm return snapshot in ppBuf, bufs will be free by sync module
// TODO: getSnapshot SHOULD be async?
int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast);
......@@ -89,19 +89,30 @@ typedef struct SSyncLogStore {
// write log with given index
int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf);
/**
* read log from given index(included) with limit, return the actual num in nBuf,
* pBuf will be free in sync module
**/
int32_t (*logRead)(struct SSyncLogStore* logStore, SyncIndex index, int limit,
SSyncBuffer* pBuf, int* nBuf);
// mark log with given index has been commtted
int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index);
// prune log before given index
// prune log before given index(not included)
int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index);
// rollback log after given index
// rollback log after given index(included)
int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index);
// return last index of log
SyncIndex (*logLastIndex)(struct SSyncLogStore* logStore);
} SSyncLogStore;
typedef struct SSyncServerState {
SyncNodeId voteFor;
SSyncTerm term;
SyncTerm term;
SyncIndex commitIndex;
} SSyncServerState;
typedef struct SSyncClusterConfig {
......@@ -122,9 +133,9 @@ typedef struct SStateManager {
int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state);
// void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
// const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng);
const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng);
} SStateManager;
typedef struct {
......@@ -146,13 +157,13 @@ SSyncNode* syncStart(const SSyncInfo*);
void syncReconfig(const SSyncNode*, const SSyncCluster*);
void syncStop(const SSyncNode*);
int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak);
// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
extern int32_t syncDebugFlag;
extern int32_t sDebugFlag;
#ifdef __cplusplus
}
......
......@@ -4,6 +4,7 @@ add_library(sync ${SYNC_SRC})
target_link_libraries(
sync
PUBLIC common
PUBLIC transport
PUBLIC util
PUBLIC wal
)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_H
#define _TD_LIBS_SYNC_RAFT_H
#include "sync.h"
#include "sync_type.h"
#include "raft_message.h"
#include "sync_raft_impl.h"
#include "sync_raft_quorum.h"
typedef struct RaftLeaderState {
} RaftLeaderState;
typedef struct RaftCandidateState {
/* true if in pre-vote phase */
bool inPreVote;
} RaftCandidateState;
typedef struct SSyncRaftIOMethods {
// send SSyncMessage to node
int (*send)(const SSyncMessage* pMsg, const SNodeInfo* pNode);
} SSyncRaftIOMethods;
typedef int (*SyncRaftStepFp)(SSyncRaft* pRaft, const SSyncMessage* pMsg);
typedef void (*SyncRaftTickFp)(SSyncRaft* pRaft);
struct SSyncRaft {
// owner sync node
SSyncNode* pNode;
SSyncCluster cluster;
int selfIndex;
SyncNodeId selfId;
SyncGroupId selfGroupId;
SSyncRaftIOMethods io;
SSyncFSM fsm;
SSyncLogStore logStore;
SStateManager stateManager;
union {
RaftLeaderState leaderState;
RaftCandidateState candidateState;
};
SyncTerm term;
SyncNodeId voteFor;
SSyncRaftLog *log;
int maxMsgSize;
SSyncRaftProgressTracker *tracker;
ESyncRole state;
// isLearner is true if the local raft node is a learner.
bool isLearner;
/**
* the leader id
**/
SyncNodeId leaderId;
/**
* leadTransferee is id of the leader transfer target when its value is not zero.
* Follow the procedure defined in raft thesis 3.10.
**/
SyncNodeId leadTransferee;
/**
* Only one conf change may be pending (in the log, but not yet
* applied) at a time. This is enforced via pendingConfIndex, which
* is set to a value >= the log index of the latest pending
* configuration change (if any). Config changes are only allowed to
* be proposed if the leader's applied index is greater than this
* value.
**/
SyncIndex pendingConfigIndex;
/**
* an estimate of the size of the uncommitted tail of the Raft log. Used to
* prevent unbounded log growth. Only maintained by the leader. Reset on
* term changes.
**/
uint32_t uncommittedSize;
/**
* number of ticks since it reached last electionTimeout when it is leader
* or candidate.
* number of ticks since it reached last electionTimeout or received a
* valid message from current leader when it is a follower.
**/
uint16_t electionElapsed;
/**
* number of ticks since it reached last heartbeatTimeout.
* only leader keeps heartbeatElapsed.
**/
uint16_t heartbeatElapsed;
bool preVote;
bool checkQuorum;
int heartbeatTimeout;
int electionTimeout;
/**
* randomizedElectionTimeout is a random number between
* [electiontimeout, 2 * electiontimeout - 1]. It gets reset
* when raft changes its state to follower or candidate.
**/
int randomizedElectionTimeout;
bool disableProposalForwarding;
// current tick count since start up
uint32_t currentTick;
SyncRaftStepFp stepFp;
SyncRaftTickFp tickFp;
};
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo);
int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int32_t syncRaftTick(SSyncRaft* pRaft);
#endif /* _TD_LIBS_SYNC_RAFT_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_CONFIGURATION_H
#define _TD_LIBS_SYNC_RAFT_CONFIGURATION_H
#include "sync.h"
#include "sync_type.h"
// return -1 if cannot find this id
int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id);
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft);
#endif /* _TD_LIBS_SYNC_RAFT_CONFIGURATION_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_LOG_H
#define _TD_LIBS_SYNC_RAFT_LOG_H
#include "sync.h"
#include "sync_type.h"
typedef enum SyncEntryType {
SYNC_ENTRY_TYPE_LOG = 1,
}SyncEntryType;
struct SSyncRaftEntry {
SyncTerm term;
SyncIndex index;
SyncEntryType type;
SSyncBuffer buffer;
};
struct SSyncRaftLog {
SyncIndex uncommittedConfigIndex;
SyncIndex commitIndex;
SyncIndex appliedIndex;
};
SSyncRaftLog* syncRaftLogOpen();
SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog);
SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog);
SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog);
bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term);
int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog);
bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog);
SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index);
int syncRaftLogAppend(SSyncRaftLog* pLog, SSyncRaftEntry *pEntries, int n);
int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize,
SSyncRaftEntry **ppEntries, int *n);
void syncRaftLogRelease(SSyncRaftLog* pLog, SyncIndex index,
SSyncRaftEntry *pEntries, int n);
bool syncRaftLogMatchTerm();
static FORCE_INLINE bool syncRaftLogIsCommitted(SSyncRaftLog* pLog, SyncIndex index) {
return pLog->commitIndex > index;
}
#endif /* _TD_LIBS_SYNC_RAFT_LOG_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_MESSAGE_H
#define _TD_LIBS_SYNC_RAFT_MESSAGE_H
#include "sync.h"
#include "sync_type.h"
/**
* below define message type which handled by Raft.
*
* internal message, which communicate between threads, start with RAFT_MSG_INTERNAL_*.
* internal message use pointer only and stack memory, need not to be decode/encode and free.
*
* outter message start with RAFT_MSG_*, which communicate between cluster peers,
* need to implement its decode/encode functions.
**/
typedef enum RaftMessageType {
// client propose a cmd
RAFT_MSG_INTERNAL_PROP = 1,
// node election timeout
RAFT_MSG_INTERNAL_ELECTION = 2,
RAFT_MSG_VOTE = 3,
RAFT_MSG_VOTE_RESP = 4,
RAFT_MSG_APPEND = 5,
RAFT_MSG_APPEND_RESP = 6,
} RaftMessageType;
typedef struct RaftMsgInternal_Prop {
const SSyncBuffer *pBuf;
bool isWeak;
void* pData;
} RaftMsgInternal_Prop;
typedef struct RaftMsgInternal_Election {
} RaftMsgInternal_Election;
typedef struct RaftMsg_Vote {
SyncRaftElectionType cType;
SyncIndex lastIndex;
SyncTerm lastTerm;
} RaftMsg_Vote;
typedef struct RaftMsg_VoteResp {
bool rejected;
SyncRaftElectionType cType;
} RaftMsg_VoteResp;
typedef struct RaftMsg_Append_Entries {
// index of log entry preceeding new ones
SyncIndex index;
// term of entry at prevIndex
SyncTerm term;
// leader's commit index.
SyncIndex commitIndex;
// size of the log entries array
int nEntries;
// log entries array
SSyncRaftEntry* entries;
} RaftMsg_Append_Entries;
typedef struct RaftMsg_Append_Resp {
SyncIndex index;
} RaftMsg_Append_Resp;
typedef struct SSyncMessage {
RaftMessageType msgType;
SyncTerm term;
SyncGroupId groupId;
SyncNodeId from;
union {
RaftMsgInternal_Prop propose;
RaftMsgInternal_Election election;
RaftMsg_Vote vote;
RaftMsg_VoteResp voteResp;
RaftMsg_Append_Entries appendEntries;
RaftMsg_Append_Resp appendResp;
};
} SSyncMessage;
static FORCE_INLINE SSyncMessage* syncInitPropMsg(SSyncMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) {
*pMsg = (SSyncMessage) {
.msgType = RAFT_MSG_INTERNAL_PROP,
.term = 0,
.propose = (RaftMsgInternal_Prop) {
.isWeak = isWeak,
.pBuf = pBuf,
.pData = pData,
},
};
return pMsg;
}
static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNodeId from) {
*pMsg = (SSyncMessage) {
.msgType = RAFT_MSG_INTERNAL_ELECTION,
.term = 0,
.from = from,
.election = (RaftMsgInternal_Election) {
},
};
return pMsg;
}
static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from,
SyncTerm term, SyncRaftElectionType cType,
SyncIndex lastIndex, SyncTerm lastTerm) {
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
if (pMsg == NULL) {
return NULL;
}
*pMsg = (SSyncMessage) {
.groupId = groupId,
.from = from,
.term = term,
.msgType = RAFT_MSG_VOTE,
.vote = (RaftMsg_Vote) {
.cType = cType,
.lastIndex = lastIndex,
.lastTerm = lastTerm,
},
};
return pMsg;
}
static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from,
SyncRaftElectionType cType, bool rejected) {
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
if (pMsg == NULL) {
return NULL;
}
*pMsg = (SSyncMessage) {
.groupId = groupId,
.from = from,
.msgType = RAFT_MSG_VOTE_RESP,
.voteResp = (RaftMsg_VoteResp) {
.cType = cType,
.rejected = rejected,
},
};
return pMsg;
}
static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNodeId from,
SyncTerm term, SyncIndex logIndex, SyncTerm logTerm,
SyncIndex commitIndex, int nEntries, SSyncRaftEntry* entries) {
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
if (pMsg == NULL) {
return NULL;
}
*pMsg = (SSyncMessage) {
.groupId = groupId,
.from = from,
.term = term,
.msgType = RAFT_MSG_APPEND,
.appendEntries = (RaftMsg_Append_Entries) {
.index = logIndex,
.term = logTerm,
.commitIndex = commitIndex,
.nEntries = nEntries,
.entries = entries,
},
};
return pMsg;
}
static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId, SyncNodeId from, SyncTerm term) {
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
if (pMsg == NULL) {
return NULL;
}
*pMsg = (SSyncMessage) {
.groupId = groupId,
.from = from,
.term = term,
.msgType = RAFT_MSG_APPEND_RESP,
.appendResp = (RaftMsg_Append_Resp) {
},
};
return pMsg;
}
static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) {
return msgType == RAFT_MSG_INTERNAL_PROP ||
msgType == RAFT_MSG_INTERNAL_ELECTION;
}
static FORCE_INLINE bool syncIsPreVoteRespMsg(const SSyncMessage* pMsg) {
return pMsg->msgType == RAFT_MSG_VOTE_RESP && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION;
}
static FORCE_INLINE bool syncIsPreVoteMsg(const SSyncMessage* pMsg) {
return pMsg->msgType == RAFT_MSG_VOTE && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION;
}
void syncFreeMessage(const SSyncMessage* pMsg);
// message handlers
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_REPLICATION_H
#define TD_SYNC_RAFT_REPLICATION_H
#include "sync.h"
#include "syncInt.h"
#include "sync_type.h"
int syncRaftReplicate(SSyncRaft* pRaft, int i);
#endif /* TD_SYNC_RAFT_REPLICATION_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_UNSTABLE_LOG_H
#define TD_SYNC_RAFT_UNSTABLE_LOG_H
#include "sync_type.h"
/* in-memory unstable raft log storage */
struct SSyncRaftUnstableLog {
#if 0
/* Circular buffer of log entries */
RaftEntry *entries;
/* size of Circular buffer */
int size;
/* Indexes of used slots [front, back) */
int front, back;
/* Index of first entry is offset + 1 */
SyncIndex offset;
/* meta data of snapshot */
SSyncRaftUnstableLog snapshot;
#endif
};
/**
* return index of last in memory log, return 0 if log is empty
**/
//SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog);
#if 0
void raftLogInit(RaftLog* pLog);
void raftLogClose(RaftLog* pLog);
/**
* When startup populating log entrues loaded from disk,
* init raft memory log with snapshot index,term and log start idnex.
**/
/*
void raftLogStart(RaftLog* pLog,
RaftSnapshotMeta snapshot,
SyncIndex startIndex);
*/
/**
* Get the number of entries the log.
**/
int raftLogNumEntries(const RaftLog* pLog);
/**
* return last term of in memory log, return 0 if log is empty
**/
SyncTerm raftLogLastTerm(RaftLog* pLog);
/**
* return term of log with the given index, return 0 if the term of index cannot be found
* , errCode will save the error code.
**/
SyncTerm raftLogTermOf(RaftLog* pLog, SyncIndex index, RaftCode* errCode);
/**
* Get the last index of the most recent snapshot. Return 0 if there are no *
* snapshots.
**/
SyncIndex raftLogSnapshotIndex(RaftLog* pLog);
/* Append a new entry to the log. */
int raftLogAppend(RaftLog* pLog,
SyncTerm term,
const SSyncBuffer *buf);
/**
* acquire log from given index onwards.
**/
/*
int raftLogAcquire(RaftLog* pLog,
SyncIndex index,
RaftEntry **ppEntries,
int *n);
void raftLogRelease(RaftLog* pLog,
SyncIndex index,
RaftEntry *pEntries,
int n);
*/
/* Delete all entries from the given index (included) onwards. */
void raftLogTruncate(RaftLog* pLog, SyncIndex index);
/**
* when taking a new snapshot, the function will update the last snapshot information and delete
* all entries up last_index - trailing (included). If the log contains no entry
* a last_index - trailing, then no entry will be deleted.
**/
void raftLogSnapshot(RaftLog* pLog, SyncIndex index, SyncIndex trailing);
#endif
#endif /* TD_SYNC_RAFT_UNSTABLE_LOG_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_INT_H
#define _TD_LIBS_SYNC_INT_H
#include "thash.h"
#include "os.h"
#include "sync.h"
#include "sync_type.h"
#include "raft.h"
#include "tlog.h"
#define TAOS_SYNC_MAX_WORKER 3
typedef struct SSyncWorker {
pthread_t thread;
} SSyncWorker;
struct SSyncNode {
pthread_mutex_t mutex;
int32_t refCount;
SyncGroupId vgId;
SSyncRaft raft;
void* syncTimer;
};
typedef struct SSyncManager {
pthread_mutex_t mutex;
// sync server rpc
void* serverRpc;
// rpc server hash table base on FQDN:port key
SHashObj* rpcServerTable;
// sync client rpc
void* clientRpc;
// worker threads
SSyncWorker worker[TAOS_SYNC_MAX_WORKER];
// vgroup hash table
SHashObj* vgroupTable;
// timer manager
void* syncTimerManager;
} SSyncManager;
extern SSyncManager* gSyncManager;
#define syncFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYNC FATAL ", 255, __VA_ARGS__); }} while(0)
#define syncError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYNC ERROR ", 255, __VA_ARGS__); }} while(0)
#define syncWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYNC WARN ", 255, __VA_ARGS__); }} while(0)
#define syncInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYNC ", 255, __VA_ARGS__); }} while(0)
#define syncDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYNC ", sDebugFlag, __VA_ARGS__); }} while(0)
#define syncTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYNC ", sDebugFlag, __VA_ARGS__); }} while(0)
#endif /* _TD_LIBS_SYNC_INT_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_IMPL_H
#define _TD_LIBS_SYNC_RAFT_IMPL_H
#include "sync.h"
#include "sync_type.h"
#include "raft_message.h"
#include "sync_raft_quorum.h"
void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId);
void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
void syncRaftBecomeCandidate(SSyncRaft* pRaft);
void syncRaftBecomeLeader(SSyncRaft* pRaft);
void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType);
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft);
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft);
bool syncRaftIsPromotable(SSyncRaft* pRaft);
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
int syncRaftQuorum(SSyncRaft* pRaft);
SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
bool preVote, bool accept,
int* rejectNum, int *granted);
#endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http: *www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_INFLIGHTS_H
#define TD_SYNC_RAFT_INFLIGHTS_H
#include "sync.h"
/**
* SSyncRaftInflights limits the number of MsgApp (represented by the largest index
* contained within) sent to followers but not yet acknowledged by them. Callers
* use syncRaftInflightFull() to check whether more messages can be sent,
* call syncRaftInflightAdd() whenever they are sending a new append,
* and release "quota" via FreeLE() whenever an ack is received.
**/
typedef struct SSyncRaftInflights {
/* the starting index in the buffer */
int start;
/* number of inflights in the buffer */
int count;
/* the size of the buffer */
int size;
/**
* buffer contains the index of the last entry
* inside one message.
**/
SyncIndex* buffer;
} SSyncRaftInflights;
SSyncRaftInflights* syncRaftOpenInflights(int size);
void syncRaftCloseInflights(SSyncRaftInflights*);
static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) {
inflights->count = 0;
inflights->start = 0;
}
static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) {
return inflights->count == inflights->size;
}
/**
* syncRaftInflightAdd notifies the Inflights that a new message with the given index is being
* dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd()
* to verify that there is room for one more message,
* and consecutive calls to add syncRaftInflightAdd() must provide a
* monotonic sequence of indexes.
**/
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex);
/**
* syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight.
**/
void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex);
/**
* syncRaftInflightFreeFirstOne releases the first inflight.
* This is a no-op if nothing is inflight.
**/
void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights);
#endif /* TD_SYNC_RAFT_INFLIGHTS_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http: *www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_PROGRESS_H
#define TD_SYNC_RAFT_PROGRESS_H
#include "sync_type.h"
#include "sync_raft_inflights.h"
/**
* State defines how the leader should interact with the follower.
*
* When in PROGRESS_STATE_PROBE, leader sends at most one replication message
* per heartbeat interval. It also probes actual progress of the follower.
*
* When in PROGRESS_STATE_REPLICATE, leader optimistically increases next
* to the latest entry sent after sending replication message. This is
* an optimized state for fast replicating log entries to the follower.
*
* When in PROGRESS_STATE_SNAPSHOT, leader should have sent out snapshot
* before and stops sending any replication message.
*
* PROGRESS_STATE_PROBE is the initial state.
**/
typedef enum RaftProgressState {
/**
* StateProbe indicates a follower whose last index isn't known. Such a
* follower is "probed" (i.e. an append sent periodically) to narrow down
* its last index. In the ideal (and common) case, only one round of probing
* is necessary as the follower will react with a hint. Followers that are
* probed over extended periods of time are often offline.
**/
PROGRESS_STATE_PROBE = 0,
/**
* StateReplicate is the state steady in which a follower eagerly receives
* log entries to append to its log.
**/
PROGRESS_STATE_REPLICATE,
/**
* StateSnapshot indicates a follower that needs log entries not available
* from the leader's Raft log. Such a follower needs a full snapshot to
* return to StateReplicate.
**/
PROGRESS_STATE_SNAPSHOT,
} RaftProgressState;
/**
* Progress represents a follower’s progress in the view of the leader. Leader maintains
* progresses of all followers, and sends entries to the follower based on its progress.
**/
struct SSyncRaftProgress {
SyncNodeId id;
SyncIndex nextIndex;
SyncIndex matchIndex;
/**
* State defines how the leader should interact with the follower.
*
* When in StateProbe, leader sends at most one replication message
* per heartbeat interval. It also probes actual progress of the follower.
*
* When in StateReplicate, leader optimistically increases next
* to the latest entry sent after sending replication message. This is
* an optimized state for fast replicating log entries to the follower.
*
* When in StateSnapshot, leader should have sent out snapshot
* before and stops sending any replication message.
**/
RaftProgressState state;
/**
* pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT.
* If there is a pending snapshot, the pendingSnapshotIndex will be set to the
* index of the snapshot. If pendingSnapshotIndex is set, the replication process of
* this Progress will be paused. raft will not resend snapshot until the pending one
* is reported to be failed.
**/
SyncIndex pendingSnapshotIndex;
/**
* recentActive is true if the progress is recently active. Receiving any messages
* from the corresponding follower indicates the progress is active.
* RecentActive can be reset to false after an election timeout.
**/
bool recentActive;
/**
* probeSent is used while this follower is in StateProbe. When probeSent is
* true, raft should pause sending replication message to this peer until
* probeSent is reset. See ProbeAcked() and IsPaused().
**/
bool probeSent;
/**
* inflights is a sliding window for the inflight messages.
* Each inflight message contains one or more log entries.
* The max number of entries per message is defined in raft config as MaxSizePerMsg.
* Thus inflight effectively limits both the number of inflight messages
* and the bandwidth each Progress can use.
* When inflights is Full, no more message should be sent.
* When a leader sends out a message, the index of the last
* entry should be added to inflights. The index MUST be added
* into inflights in order.
* When a leader receives a reply, the previous inflights should
* be freed by calling inflights.FreeLE with the index of the last
* received entry.
**/
SSyncRaftInflights* inflights;
/**
* IsLearner is true if this progress is tracked for a learner.
**/
bool isLearner;
};
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress);
/**
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
* optionally and if larger, the index of the pending snapshot.
**/
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress);
/**
* syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
**/
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress);
/**
* syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the
* index acked by it. The method returns false if the given n index comes from
* an outdated message. Otherwise it updates the progress and returns true.
**/
bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex);
/**
* syncRaftProgressOptimisticNextIndex signals that appends all the way up to and including index n
* are in-flight. As a result, Next is increased to n+1.
**/
static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) {
progress->nextIndex = nextIndex + 1;
}
/**
* syncRaftProgressMaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
* arguments are the index of the append message rejected by the follower, and
* the hint that we want to decrease to.
*
* Rejections can happen spuriously as messages are sent out of order or
* duplicated. In such cases, the rejection pertains to an index that the
* Progress already knows were previously acknowledged, and false is returned
* without changing the Progress.
*
* If the rejection is genuine, Next is lowered sensibly, and the Progress is
* cleared for sending log entries.
**/
bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
SyncIndex rejected, SyncIndex matchHint);
/**
* syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled.
* This is done when a node has rejected recent MsgApps, is currently waiting
* for a snapshot, or has reached the MaxInflightMsgs limit. In normal
* operation, this is false. A throttled node will be contacted less frequently
* until it has reached a state in which it's able to accept a steady stream of
* log entries again.
**/
bool syncRaftProgressIsPaused(SSyncRaftProgress* progress);
static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) {
return progress->nextIndex;
}
static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) {
return progress->state == PROGRESS_STATE_REPLICATE;
}
static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) {
return progress->state == PROGRESS_STATE_SNAPSHOT;
}
static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) {
return progress->state == PROGRESS_STATE_PROBE;
}
static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progress) {
return progress->recentActive;
}
/**
* return true if progress's log is up-todate
**/
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress);
void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex);
#if 0
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i);
SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i);
void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i);
void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i);
bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i);
void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i);
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i);
#endif
#endif /* TD_SYNC_RAFT_PROGRESS_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H
#define _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H
#include "sync_type.h"
#include "sync_raft_quorum_joint.h"
#include "sync_raft_progress.h"
struct SSyncRaftProgressTrackerConfig {
SSyncRaftQuorumJointConfig voters;
/**
* autoLeave is true if the configuration is joint and a transition to the
* incoming configuration should be carried out automatically by Raft when
* this is possible. If false, the configuration will be joint until the
* application initiates the transition manually.
**/
bool autoLeave;
/**
* Learners is a set of IDs corresponding to the learners active in the
* current configuration.
*
* Invariant: Learners and Voters does not intersect, i.e. if a peer is in
* either half of the joint config, it can't be a learner; if it is a
* learner it can't be in either half of the joint config. This invariant
* simplifies the implementation since it allows peers to have clarity about
* its current role without taking into account joint consensus.
**/
SyncNodeId learners[TSDB_MAX_REPLICA];
/**
* When we turn a voter into a learner during a joint consensus transition,
* we cannot add the learner directly when entering the joint state. This is
* because this would violate the invariant that the intersection of
* voters and learners is empty. For example, assume a Voter is removed and
* immediately re-added as a learner (or in other words, it is demoted):
*
* Initially, the configuration will be
*
* voters: {1 2 3}
* learners: {}
*
* and we want to demote 3. Entering the joint configuration, we naively get
*
* voters: {1 2} & {1 2 3}
* learners: {3}
*
* but this violates the invariant (3 is both voter and learner). Instead,
* we get
*
* voters: {1 2} & {1 2 3}
* learners: {}
* next_learners: {3}
*
* Where 3 is now still purely a voter, but we are remembering the intention
* to make it a learner upon transitioning into the final configuration:
*
* voters: {1 2}
* learners: {3}
* next_learners: {}
*
* Note that next_learners is not used while adding a learner that is not
* also a voter in the joint config. In this case, the learner is added
* right away when entering the joint configuration, so that it is caught up
* as soon as possible.
**/
SyncNodeId learnersNext[TSDB_MAX_REPLICA];
};
struct SSyncRaftProgressTracker {
SSyncRaftProgressTrackerConfig config;
SSyncRaftProgress progressMap[TSDB_MAX_REPLICA];
SyncRaftVoteResult votes[TSDB_MAX_REPLICA];
int maxInflight;
};
SSyncRaftProgressTracker* syncRaftOpenProgressTracker();
void syncRaftResetVotes(SSyncRaftProgressTracker*);
typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg);
void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg);
/**
* syncRaftRecordVote records that the node with the given id voted for this Raft
* instance if v == true (and declined it otherwise).
**/
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant);
/**
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
* election outcome is known.
**/
SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted);
#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_QUORUM_H
#define TD_SYNC_RAFT_QUORUM_H
/**
* SSyncRaftVoteResult indicates the outcome of a vote.
**/
typedef enum {
/**
* SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future
* votes, i.e. neither "yes" or "no" has reached quorum yet.
**/
SYNC_RAFT_VOTE_PENDING = 1,
/**
* SYNC_RAFT_VOTE_LOST indicates that the quorum has voted "no".
**/
SYNC_RAFT_VOTE_LOST = 2,
/**
* SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes".
**/
SYNC_RAFT_VOTE_WON = 3,
} SSyncRaftVoteResult;
#endif /* TD_SYNC_RAFT_QUORUM_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H
#define _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H
#include "taosdef.h"
#include "sync.h"
#include "sync_type.h"
/**
* SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping)
* majority configurations. Decisions require the support of both majorities.
**/
typedef struct SSyncRaftQuorumJointConfig {
SSyncCluster majorityConfig[2];
}SSyncRaftQuorumJointConfig;
/**
* syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending, lost, or won. A joint quorum
* requires both majority quorums to vote in favor.
**/
SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes);
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H
#define _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H
#include "sync.h"
#include "sync_type.h"
/**
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending (i.e. neither a quorum of
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached).
**/
SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes);
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_TYPE_H
#define _TD_LIBS_SYNC_TYPE_H
#include <stdint.h>
#include "osMath.h"
#define SYNC_NON_NODE_ID -1
#define SYNC_NON_TERM 0
typedef int32_t SyncTime;
typedef uint32_t SyncTick;
typedef struct SSyncRaft SSyncRaft;
typedef struct SSyncRaftProgress SSyncRaftProgress;
typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig;
typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker;
typedef struct SSyncRaftLog SSyncRaftLog;
typedef struct SSyncRaftEntry SSyncRaftEntry;
#if 0
#ifndef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))
#endif
#ifndef MAX
#define MAX(x, y) (((x) > (y)) ? (x) : (y))
#endif
#endif
typedef enum {
SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0,
SYNC_RAFT_CAMPAIGN_ELECTION = 1,
SYNC_RAFT_CAMPAIGN_TRANSFER = 2,
} SyncRaftElectionType;
typedef enum {
// the init vote resp status
SYNC_RAFT_VOTE_RESP_UNKNOWN = 0,
// grant the vote request
SYNC_RAFT_VOTE_RESP_GRANT = 1,
//reject the vote request
SYNC_RAFT_VOTE_RESP_REJECT = 2,
} SyncRaftVoteResult;
#endif /* _TD_LIBS_SYNC_TYPE_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft.h"
#include "raft_configuration.h"
#include "raft_log.h"
#include "raft_replication.h"
#include "sync_raft_progress_tracker.h"
#include "syncInt.h"
#define RAFT_READ_LOG_MAX_NUM 100
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
SSyncNode* pNode = pRaft->pNode;
SSyncServerState serverState;
SStateManager* stateManager;
SSyncLogStore* logStore;
SSyncFSM* fsm;
SyncIndex initIndex = pInfo->snapshotIndex;
SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM];
int nBuf, limit, i;
memset(pRaft, 0, sizeof(SSyncRaft));
memcpy(&pRaft->fsm, &pInfo->fsm, sizeof(SSyncFSM));
memcpy(&pRaft->logStore, &pInfo->logStore, sizeof(SSyncLogStore));
memcpy(&pRaft->stateManager, &pInfo->stateManager, sizeof(SStateManager));
stateManager = &(pRaft->stateManager);
logStore = &(pRaft->logStore);
fsm = &(pRaft->fsm);
// init progress tracker
pRaft->tracker = syncRaftOpenProgressTracker();
if (pRaft->tracker == NULL) {
return -1;
}
// open raft log
if ((pRaft->log = syncRaftLogOpen()) == NULL) {
return -1;
}
// read server state
if (stateManager->readServerState(stateManager, &serverState) != 0) {
syncError("readServerState for vgid %d fail", pInfo->vgId);
return -1;
}
assert(initIndex <= serverState.commitIndex);
// restore fsm state from snapshot index + 1 until commitIndex
++initIndex;
while (initIndex <= serverState.commitIndex) {
limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex + 1);
if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) {
return -1;
}
assert(limit == nBuf);
for (i = 0; i < limit; ++i) {
fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL);
free(buffer[i].data);
}
initIndex += nBuf;
}
assert(initIndex == serverState.commitIndex);
//pRaft->heartbeatTimeoutTick = 1;
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
pRaft->selfIndex = pRaft->cluster.selfIndex;
syncInfo("[%d:%d] restore vgid %d state: snapshot index success",
pRaft->selfGroupId, pRaft->selfId, pInfo->vgId);
return 0;
}
int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
syncDebug("from %d, type:%d, term:%" PRId64 ", state:%d",
pMsg->from, pMsg->msgType, pMsg->term, pRaft->state);
if (preHandleMessage(pRaft, pMsg)) {
syncFreeMessage(pMsg);
return 0;
}
RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_INTERNAL_ELECTION) {
syncRaftHandleElectionMessage(pRaft, pMsg);
} else if (msgType == RAFT_MSG_VOTE) {
syncRaftHandleVoteMessage(pRaft, pMsg);
} else {
pRaft->stepFp(pRaft, pMsg);
}
syncFreeMessage(pMsg);
return 0;
}
int32_t syncRaftTick(SSyncRaft* pRaft) {
pRaft->currentTick += 1;
return 0;
}
/**
* pre-handle message, return true means no need to continue
* Handle the message term, which may result in our stepping down to a follower.
**/
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
// local message?
if (pMsg->term == 0) {
return false;
}
if (pMsg->term > pRaft->term) {
return preHandleNewTermMessage(pRaft, pMsg);
} else if (pMsg->term < pRaft->term) {
return preHandleOldTermMessage(pRaft, pMsg);
}
return false;
}
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
SyncNodeId leaderId = pMsg->from;
RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_VOTE) {
// TODO
leaderId = SYNC_NON_NODE_ID;
}
if (syncIsPreVoteMsg(pMsg)) {
// Never change our term in response to a PreVote
} else if (syncIsPreVoteRespMsg(pMsg) && !pMsg->voteResp.rejected) {
/**
* We send pre-vote requests with a term in our future. If the
* pre-vote is granted, we will increment our term when we get a
* quorum. If it is not, the term comes from the node that
* rejected our vote so we should become a follower at the new
* term.
**/
} else {
syncInfo("[%d:%d] [term:%" PRId64 "] received a %d message with higher term from %d [term:%" PRId64 "]",
pRaft->selfGroupId, pRaft->selfId, pRaft->term, msgType, pMsg->from, pMsg->term);
syncRaftBecomeFollower(pRaft, pMsg->term, leaderId);
}
return false;
}
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (pRaft->checkQuorum && pMsg->msgType == RAFT_MSG_APPEND) {
/**
* We have received messages from a leader at a lower term. It is possible
* that these messages were simply delayed in the network, but this could
* also mean that this node has advanced its term number during a network
* partition, and it is now unable to either win an election or to rejoin
* the majority on the old term. If checkQuorum is false, this will be
* handled by incrementing term numbers in response to MsgVote with a
* higher term, but if checkQuorum is true we may not advance the term on
* MsgVote and must generate other messages to advance the term. The net
* result of these two features is to minimize the disruption caused by
* nodes that have been removed from the cluster's configuration: a
* removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
* but it will not receive MsgApp or MsgHeartbeat, so it will not create
* disruptive term increases
**/
int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (peerIndex < 0) {
return true;
}
SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term);
if (msg == NULL) {
return true;
}
pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex]));
} else {
// ignore other cases
syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]",
pRaft->selfGroupId, pRaft->selfId, pRaft->term, pMsg->msgType, pMsg->from, pMsg->term);
}
return true;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft_configuration.h"
#include "raft.h"
int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) {
return (int)(id);
}
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft) {
return pRaft->cluster.replica;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_message.h"
void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
SyncTerm term;
bool preVote;
RaftMessageType voteMsgType;
if (syncRaftIsPromotable(pRaft)) {
syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
syncRaftBecomePreCandidate(pRaft);
preVote = true;
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = pRaft->term + 1;
} else {
syncRaftBecomeCandidate(pRaft);
voteMsgType = RAFT_MSG_VOTE;
term = pRaft->term;
preVote = false;
}
int quorum = syncRaftQuorum(pRaft);
SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL);
if (result == SYNC_RAFT_VOTE_WON) {
/**
* We won the election after voting for ourselves (which must mean that
* this is a single-node cluster). Advance to the next state.
**/
if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
} else {
syncRaftBecomeLeader(pRaft);
}
return;
}
// broadcast vote message to other peers
int i;
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (i == pRaft->cluster.selfIndex) {
continue;
}
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId,
term, cType, lastIndex, lastTerm);
if (pMsg == NULL) {
continue;
}
syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent %d request to %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, lastTerm,
lastIndex, voteMsgType, nodeId, pRaft->term);
pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i]));
}
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_configuration.h"
#include "raft_message.h"
int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries);
int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (peerIndex < 0) {
return 0;
}
SSyncMessage* pRespMsg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term);
if (pRespMsg == NULL) {
return 0;
}
RaftMsg_Append_Entries *appendResp = &(pRespMsg->appendResp);
// ignore committed logs
if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) {
appendResp->index = pRaft->log->commitIndex;
goto out;
}
syncInfo("[%d:%d] recv append from %d index %" PRId64"",
pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index);
out:
pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex]));
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_message.h"
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (pRaft->state == TAOS_SYNC_ROLE_LEADER) {
syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
if (!syncRaftIsPromotable(pRaft)) {
syncDebug("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
// if there is pending uncommitted config,cannot start election
if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) {
syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply",
pRaft->selfGroupId, pRaft->selfId, pRaft->term);
return 0;
}
syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
if (pRaft->preVote) {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION);
} else {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
}
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_configuration.h"
#include "raft_log.h"
#include "raft_message.h"
static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
SSyncMessage* pRespMsg;
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (voteIndex == -1) {
return 0;
}
bool grant;
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
grant = canGrantVoteMessage(pRaft, pMsg);
pRespMsg = syncNewVoteRespMsg(pRaft->selfGroupId, pRaft->selfId, pMsg->vote.cType, !grant);
if (pRespMsg == NULL) {
return 0;
}
syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] %s for %d"\
"[logterm: %" PRId64 ", index: %" PRId64 "] at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, pRaft->voteFor,
grant ? "grant" : "reject",
pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term);
pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex]));
return 0;
}
static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) {
return false;
}
if (!syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) {
return false;
}
return true;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_configuration.h"
#include "raft_message.h"
int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
int granted, rejected;
int quorum;
int voterIndex;
assert(pRaft->state == TAOS_SYNC_ROLE_CANDIDATE);
voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (voterIndex == -1) {
syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from);
return 0;
}
if (pRaft->state != TAOS_SYNC_ROLE_CANDIDATE) {
syncError("[%d:%d] is not candidate, ignore vote resp", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pMsg->from,
pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION,
!pMsg->voteResp.rejected, &rejected, &granted);
syncInfo("[%d:%d] [quorum:%d] has received %d votes and %d vote rejections",
pRaft->selfGroupId, pRaft->selfId, quorum, granted, rejected);
if (result == SYNC_RAFT_VOTE_WON) {
if (pRaft->candidateState.inPreVote) {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
} else {
syncRaftBecomeLeader(pRaft);
}
} else if (result == SYNC_RAFT_VOTE_LOST) {
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
}
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft_log.h"
SSyncRaftLog* syncRaftLogOpen() {
return NULL;
}
SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog) {
return 0;
}
SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog) {
return 0;
}
SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) {
return 0;
}
bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term) {
return true;
}
int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog) {
return 0;
}
bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog) {
return pLog->commitIndex > pLog->appliedIndex;
}
SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index) {
return SYNC_NON_TERM;
}
int syncRaftLogAppend(SSyncRaftLog* pLog, SSyncRaftEntry *pEntries, int n) {
}
int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize,
SSyncRaftEntry **ppEntries, int *n) {
return 0;
}
void syncRaftLogRelease(SSyncRaftLog* pLog, SyncIndex index,
SSyncRaftEntry *pEntries, int n) {
return;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft_message.h"
void syncFreeMessage(const SSyncMessage* pMsg) {
if (!syncIsInternalMsg(pMsg->msgType)) {
free((SSyncMessage*)pMsg);
}
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft.h"
#include "raft_log.h"
#include "sync_raft_progress.h"
#include "raft_replication.h"
static int sendSnapshot(SSyncRaft* pRaft, int i);
static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term);
int syncRaftReplicate(SSyncRaft* pRaft, int i) {
#if 0
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
assert(i >= 0 && i < pRaft->leaderState.nProgress);
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
if (syncRaftProgressIsPaused(progress)) {
syncInfo("node %d paused", nodeId);
return 0;
}
SyncIndex nextIndex = syncRaftProgressNextIndex(progress);
SyncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log);
bool inSnapshot = syncRaftProgressInSnapshot(progress);
SyncIndex prevIndex;
SyncTerm prevTerm;
/**
* From Section 3.5:
*
* When sending an AppendEntries RPC, the leader includes the index and
* term of the entry in its log that immediately precedes the new
* entries. If the follower does not find an entry in its log with the
* same index and term, then it refuses the new entries. The consistency
* check acts as an induction step: the initial empty state of the logs
* satisfies the Log Matching Property, and the consistency check
* preserves the Log Matching Property whenever logs are extended. As a
* result, whenever AppendEntries returns successfully, the leader knows
* that the follower's log is identical to its own log up through the new
* entries (Log Matching Property in Figure 3.2).
**/
if (nextIndex == 1) {
/**
* We're including the very first entry, so prevIndex and prevTerm are
* null. If the first entry is not available anymore, send the last
* snapshot if we're not already sending one.
**/
if (snapshotIndex > 0 && !inSnapshot) {
goto send_snapshot;
}
// otherwise send append entries from start
prevIndex = 0;
prevTerm = 0;
} else {
/**
* Set prevIndex and prevTerm to the index and term of the entry at
* nextIndex - 1.
**/
prevIndex = nextIndex - 1;
prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);
/**
* If the entry is not anymore in our log, send the last snapshot if we're
* not doing so already.
**/
if (prevTerm == SYNC_NON_TERM && !inSnapshot) {
goto send_snapshot;
}
}
/* Send empty AppendEntries RPC when installing a snaphot */
if (inSnapshot) {
prevIndex = syncRaftLogLastIndex(pRaft->log);
prevTerm = syncRaftLogLastTerm(pRaft->log);
}
return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
send_snapshot:
if (syncRaftProgressRecentActive(progress)) {
/* Only send a snapshot when we have heard from the server */
return sendSnapshot(pRaft, i);
} else {
/* Send empty AppendEntries RPC when we haven't heard from the server */
prevIndex = syncRaftLogLastIndex(pRaft->log);
prevTerm = syncRaftLogLastTerm(pRaft->log);
return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
}
#endif
return 0;
}
static int sendSnapshot(SSyncRaft* pRaft, int i) {
return 0;
}
static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) {
#if 0
SyncIndex nextIndex = prevIndex + 1;
SSyncRaftEntry *entries;
int nEntry;
SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]);
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry);
SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term,
prevIndex, prevTerm, pRaft->log->commitIndex,
nEntry, entries);
if (msg == NULL) {
goto err_release_log;
}
pRaft->io.send(msg, pNode);
if (syncRaftProgressInReplicate(progress)) {
SyncIndex lastIndex = nextIndex + nEntry;
syncRaftProgressOptimisticNextIndex(progress, lastIndex);
syncRaftInflightAdd(&progress->inflights, lastIndex);
} else if (syncRaftProgressInProbe(progress)) {
syncRaftProgressPause(progress);
} else {
}
syncRaftProgressUpdateSendTick(progress, pRaft->currentTick);
return 0;
err_release_log:
syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry);
#endif
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
......@@ -13,15 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_RAFT_INT_H_
#define _TD_RAFT_INT_H_
#include "sync.h"
#include "raft_unstable_log.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
/*
SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog) {
return 0;
}
#endif
#endif /*_TD_RAFT_INT_H_*/
\ No newline at end of file
*/
\ No newline at end of file
......@@ -13,14 +13,282 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync.h"
#include "syncInt.h"
#include "trpc.h"
#include "ttimer.h"
int32_t syncInit() { return 0; }
SSyncManager* gSyncManager = NULL;
void syncCleanUp() {}
#define SYNC_TICK_TIMER 50
#define SYNC_ACTIVITY_TIMER 5
#define SYNC_SERVER_WORKER 2
SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; }
static void syncProcessRsp(SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncProcessReqMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void syncStop(const SSyncNode* pNode) {}
static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyncCfg);
static int syncInitRpcClient(SSyncManager* syncManager);
static int syncOpenWorkerPool(SSyncManager* syncManager);
static int syncCloseWorkerPool(SSyncManager* syncManager);
static void *syncWorkerMain(void *argv);
static void syncNodeTick(void *param, void *tmrId);
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
\ No newline at end of file
int32_t syncInit() {
if (gSyncManager != NULL) {
return 0;
}
gSyncManager = (SSyncManager*)calloc(sizeof(SSyncManager), 0);
if (gSyncManager == NULL) {
syncError("malloc SSyncManager fail");
return -1;
}
pthread_mutex_init(&gSyncManager->mutex, NULL);
// init client rpc
if (syncInitRpcClient(gSyncManager) != 0) {
syncCleanUp();
return -1;
}
// init sync timer manager
gSyncManager->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
if (gSyncManager->syncTimerManager == NULL) {
syncCleanUp();
return -1;
}
// init worker pool
if (syncOpenWorkerPool(gSyncManager) != 0) {
syncCleanUp();
return -1;
}
// init vgroup hash table
gSyncManager->vgroupTable = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (gSyncManager->vgroupTable == NULL) {
syncCleanUp();
return -1;
}
return 0;
}
void syncCleanUp() {
if (gSyncManager == NULL) {
return;
}
pthread_mutex_lock(&gSyncManager->mutex);
if (gSyncManager->vgroupTable) {
taosHashCleanup(gSyncManager->vgroupTable);
}
if (gSyncManager->clientRpc) {
rpcClose(gSyncManager->clientRpc);
syncInfo("sync inter-sync rpc client is closed");
}
if (gSyncManager->syncTimerManager) {
taosTmrCleanUp(gSyncManager->syncTimerManager);
}
syncCloseWorkerPool(gSyncManager);
pthread_mutex_unlock(&gSyncManager->mutex);
pthread_mutex_destroy(&gSyncManager->mutex);
free(gSyncManager);
gSyncManager = NULL;
}
SSyncNode* syncStart(const SSyncInfo* pInfo) {
pthread_mutex_lock(&gSyncManager->mutex);
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId));
if (ppNode != NULL) {
syncInfo("vgroup %d already exist", pInfo->vgId);
pthread_mutex_unlock(&gSyncManager->mutex);
return *ppNode;
}
// init rpc server
if (syncInitRpcServer(gSyncManager, &pInfo->syncCfg) != 0) {
pthread_mutex_unlock(&gSyncManager->mutex);
return NULL;
}
SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode));
if (pNode == NULL) {
syncError("malloc vgroup %d node fail", pInfo->vgId);
pthread_mutex_unlock(&gSyncManager->mutex);
return NULL;
}
pNode->syncTimer = taosTmrStart(syncNodeTick, SYNC_TICK_TIMER, (void*)pInfo->vgId, gSyncManager->syncTimerManager);
// start raft
pNode->raft.pNode = pNode;
if (syncRaftStart(&pNode->raft, pInfo) != 0) {
syncError("raft start at %d node fail", pInfo->vgId);
pthread_mutex_unlock(&gSyncManager->mutex);
return NULL;
}
pthread_mutex_init(&pNode->mutex, NULL);
taosHashPut(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId), &pNode, sizeof(SSyncNode *));
pthread_mutex_unlock(&gSyncManager->mutex);
return NULL;
}
void syncStop(const SSyncNode* pNode) {
pthread_mutex_lock(&gSyncManager->mutex);
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId));
if (ppNode == NULL) {
syncInfo("vgroup %d not exist", pNode->vgId);
pthread_mutex_unlock(&gSyncManager->mutex);
return;
}
assert(*ppNode == pNode);
taosTmrStop(pNode->syncTimer);
taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId));
pthread_mutex_unlock(&gSyncManager->mutex);
pthread_mutex_destroy(&((*ppNode)->mutex));
free(*ppNode);
}
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak) {
SSyncMessage msg;
pthread_mutex_lock(&syncNode->mutex);
int32_t ret = syncRaftStep(&syncNode->raft, syncInitPropMsg(&msg, pBuf, pData, isWeak));
pthread_mutex_unlock(&syncNode->mutex);
return ret;
}
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
// process rpc rsp message from other sync server
static void syncProcessRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
}
// process rpc message from other sync server
static void syncProcessReqMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyncCfg) {
if (gSyncManager->rpcServerTable == NULL) {
gSyncManager->rpcServerTable = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (gSyncManager->rpcServerTable == NULL) {
syncError("init sync rpc server hash table error");
return -1;
}
}
assert(pSyncCfg->selfIndex < pSyncCfg->replica && pSyncCfg->selfIndex >= 0);
const SNodeInfo* pNode = &(pSyncCfg->nodeInfo[pSyncCfg->replica]);
char buffer[20] = {'\0'};
snprintf(buffer, sizeof(buffer), "%s:%d", &(pNode->nodeFqdn[0]), pNode->nodePort);
size_t len = strlen(buffer);
void** ppRpcServer = taosHashGet(gSyncManager->rpcServerTable, buffer, len);
if (ppRpcServer != NULL) {
// already inited
syncInfo("sync rpc server for %s already exist", buffer);
return 0;
}
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pNode->nodePort;
rpcInit.label = "sync-server";
rpcInit.numOfThreads = SYNC_SERVER_WORKER;
rpcInit.cfp = syncProcessReqMsg;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = SYNC_ACTIVITY_TIMER * 1000;
void* rpcServer = rpcOpen(&rpcInit);
if (rpcServer == NULL) {
syncInfo("rpcOpen for sync rpc server for %s fail", buffer);
return -1;
}
taosHashPut(gSyncManager->rpcServerTable, buffer, strlen(buffer), rpcServer, len);
syncInfo("sync rpc server for %s init success", buffer);
return 0;
}
static int syncInitRpcClient(SSyncManager* syncManager) {
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "sync-client";
rpcInit.numOfThreads = 1;
rpcInit.cfp = syncProcessRsp;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = SYNC_ACTIVITY_TIMER * 1000;
rpcInit.user = "t";
rpcInit.ckey = "key";
rpcInit.secret = secret;
syncManager->clientRpc = rpcOpen(&rpcInit);
if (syncManager->clientRpc == NULL) {
syncError("failed to init sync rpc client");
return -1;
}
syncInfo("sync inter-sync rpc client is initialized");
return 0;
}
static int syncOpenWorkerPool(SSyncManager* syncManager) {
int i;
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (i = 0; i < TAOS_SYNC_MAX_WORKER; ++i) {
SSyncWorker* pWorker = &(syncManager->worker[i]);
if (pthread_create(&(pWorker->thread), &thattr, (void *)syncWorkerMain, pWorker) != 0) {
syncError("failed to create sync worker since %s", strerror(errno));
return -1;
}
}
pthread_attr_destroy(&thattr);
return 0;
}
static int syncCloseWorkerPool(SSyncManager* syncManager) {
return 0;
}
static void *syncWorkerMain(void *argv) {
SSyncWorker* pWorker = (SSyncWorker *)argv;
taosBlockSIGPIPE();
setThreadName("syncWorker");
return NULL;
}
static void syncNodeTick(void *param, void *tmrId) {
SyncGroupId vgId = (SyncGroupId)param;
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId));
if (ppNode == NULL) {
return;
}
SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex);
syncRaftTick(&pNode->raft);
pthread_mutex_unlock(&pNode->mutex);
pNode->syncTimer = taosTmrStart(syncNodeTick, SYNC_TICK_TIMER, (void*)pNode->vgId, gSyncManager->syncTimerManager);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft.h"
#include "raft_configuration.h"
#include "raft_log.h"
#include "raft_replication.h"
#include "sync_raft_progress_tracker.h"
#include "syncInt.h"
static int convertClear(SSyncRaft* pRaft);
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int triggerAll(SSyncRaft* pRaft);
static void tickElection(SSyncRaft* pRaft);
static void tickHeartbeat(SSyncRaft* pRaft);
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n);
static bool maybeCommit(SSyncRaft* pRaft);
static void abortLeaderTransfer(SSyncRaft* pRaft);
static void resetRaft(SSyncRaft* pRaft, SyncTerm term);
void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) {
convertClear(pRaft);
pRaft->stepFp = stepFollower;
resetRaft(pRaft, term);
pRaft->tickFp = tickElection;
pRaft->leaderId = leaderId;
pRaft->state = TAOS_SYNC_ROLE_FOLLOWER;
syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
convertClear(pRaft);
/**
* Becoming a pre-candidate changes our step functions and state,
* but doesn't change anything else. In particular it does not increase
* r.Term or change r.Vote.
**/
pRaft->stepFp = stepCandidate;
pRaft->tickFp = tickElection;
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
pRaft->candidateState.inPreVote = true;
syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
convertClear(pRaft);
pRaft->candidateState.inPreVote = false;
pRaft->stepFp = stepCandidate;
// become candidate make term+1
resetRaft(pRaft, pRaft->term + 1);
pRaft->tickFp = tickElection;
pRaft->voteFor = pRaft->selfId;
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomeLeader(SSyncRaft* pRaft) {
assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER);
pRaft->stepFp = stepLeader;
resetRaft(pRaft, pRaft->term);
pRaft->leaderId = pRaft->leaderId;
pRaft->state = TAOS_SYNC_ROLE_LEADER;
// TODO: check if there is pending config log
int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log);
if (nPendingConf > 1) {
syncFatal("unexpected multiple uncommitted config entry");
}
syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
// after become leader, send a no-op log
SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry));
if (entry == NULL) {
return;
}
*entry = (SSyncRaftEntry) {
.buffer = (SSyncBuffer) {
.data = NULL,
.len = 0,
}
};
appendEntries(pRaft, entry, 1);
//syncRaftTriggerHeartbeat(pRaft);
}
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) {
triggerAll(pRaft);
}
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) {
// electionTimeoutTick in [3,6] tick
pRaft->randomizedElectionTimeout = taosRand() % 4 + 3;
}
bool syncRaftIsPromotable(SSyncRaft* pRaft) {
return pRaft->selfId != SYNC_NON_NODE_ID;
}
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
return pRaft->electionElapsed >= pRaft->randomizedElectionTimeout;
}
int syncRaftQuorum(SSyncRaft* pRaft) {
return pRaft->cluster.replica / 2 + 1;
}
SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
bool preVote, bool grant,
int* rejected, int *granted) {
int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
if (voterIndex == -1) {
return SYNC_RAFT_VOTE_PENDING;
}
if (grant) {
syncInfo("[%d:%d] received grant (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
} else {
syncInfo("[%d:%d] received rejection (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
}
syncRaftRecordVote(pRaft->tracker, voterIndex, grant);
return syncRaftTallyVotes(pRaft->tracker, rejected, granted);
}
/*
if (accept) {
syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
} else {
syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, id, pRaft->term);
}
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0);
assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN);
pRaft->candidateState.votes[voteIndex] = accept ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT;
int granted = 0, rejected = 0;
int i;
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) granted++;
else if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_REJECT) rejected++;
}
if (rejectNum) *rejectNum = rejected;
return granted;
*/
static int convertClear(SSyncRaft* pRaft) {
}
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return 0;
}
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
/**
* Only handle vote responses corresponding to our candidacy (while in
* StateCandidate, we may get stale MsgPreVoteResp messages in this term from
* our pre-candidate state).
**/
RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_INTERNAL_PROP) {
return 0;
}
if (msgType == RAFT_MSG_VOTE_RESP) {
syncRaftHandleVoteRespMessage(pRaft, pMsg);
return 0;
} else if (msgType == RAFT_MSG_APPEND) {
syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from);
syncRaftHandleAppendEntriesMessage(pRaft, pMsg);
}
return 0;
}
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
convertClear(pRaft);
return 0;
}
/**
* tickElection is run by followers and candidates per tick.
**/
static void tickElection(SSyncRaft* pRaft) {
pRaft->electionElapsed += 1;
if (!syncRaftIsPromotable(pRaft)) {
return;
}
if (!syncRaftIsPastElectionTimeout(pRaft)) {
return;
}
// election timeout
pRaft->electionElapsed = 0;
SSyncMessage msg;
syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId));
}
static void tickHeartbeat(SSyncRaft* pRaft) {
}
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm term = pRaft->term;
int i;
for (i = 0; i < n; ++i) {
entries[i].term = term;
entries[i].index = lastIndex + 1 + i;
}
syncRaftLogAppend(pRaft->log, entries, n);
SSyncRaftProgress* progress = &(pRaft->tracker->progressMap[pRaft->cluster.selfIndex]);
syncRaftProgressMaybeUpdate(progress, lastIndex);
// Regardless of maybeCommit's return, our caller will call bcastAppend.
maybeCommit(pRaft);
}
/**
* maybeCommit attempts to advance the commit index. Returns true if
* the commit index changed (in which case the caller should call
* r.bcastAppend).
**/
static bool maybeCommit(SSyncRaft* pRaft) {
return true;
}
/**
* trigger I/O requests for newly appended log entries or heartbeats.
**/
static int triggerAll(SSyncRaft* pRaft) {
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
int i;
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (i == pRaft->cluster.selfIndex) {
continue;
}
syncRaftReplicate(pRaft, i);
}
}
static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID;
}
static void initProgress(int i, SSyncRaftProgress* progress, void* arg) {
syncRaftInitProgress(i, (SSyncRaft*)arg, progress);
}
static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
if (pRaft->term != term) {
pRaft->term = term;
pRaft->voteFor = SYNC_NON_NODE_ID;
}
pRaft->leaderId = SYNC_NON_NODE_ID;
pRaft->electionElapsed = 0;
pRaft->heartbeatElapsed = 0;
syncRaftRandomizedElectionTimeout(pRaft);
abortLeaderTransfer(pRaft);
syncRaftResetVotes(pRaft->tracker);
syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft);
pRaft->pendingConfigIndex = 0;
pRaft->uncommittedSize = 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http: *www.gnu.org/licenses/>.
*/
#include "sync_raft_inflights.h"
SSyncRaftInflights* syncRaftOpenInflights(int size) {
SSyncRaftInflights* inflights = (SSyncRaftInflights*)malloc(sizeof(SSyncRaftInflights));
if (inflights == NULL) {
return NULL;
}
SyncIndex* buffer = (SyncIndex*)malloc(sizeof(SyncIndex) * size);
if (buffer == NULL) {
free(inflights);
return NULL;
}
*inflights = (SSyncRaftInflights) {
.buffer = buffer,
.count = 0,
.size = 0,
.start = 0,
};
return inflights;
}
void syncRaftCloseInflights(SSyncRaftInflights* inflights) {
free(inflights->buffer);
free(inflights);
}
/**
* syncRaftInflightAdd notifies the Inflights that a new message with the given index is being
* dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd()
* to verify that there is room for one more message,
* and consecutive calls to add syncRaftInflightAdd() must provide a
* monotonic sequence of indexes.
**/
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) {
assert(!syncRaftInflightFull(inflights));
int next = inflights->start + inflights->count;
int size = inflights->size;
/* is next wrapped around buffer? */
if (next >= size) {
next -= size;
}
inflights->buffer[next] = inflightIndex;
inflights->count++;
}
/**
* syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight.
**/
void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) {
if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) {
/* out of the left side of the window */
return;
}
int i, idx;
for (i = 0, idx = inflights->start; i < inflights->count; i++) {
if (toIndex < inflights->buffer[idx]) { // found the first large inflight
break;
}
// increase index and maybe rotate
int size = inflights->size;
idx++;
if (idx >= size) {
idx -= size;
}
}
// free i inflights and set new start index
inflights->count -= i;
inflights->start = idx;
assert(inflights->count >= 0);
if (inflights->count == 0) {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
inflights->start = 0;
}
}
/**
* syncRaftInflightFreeFirstOne releases the first inflight.
* This is a no-op if nothing is inflight.
**/
void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) {
syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft.h"
#include "raft_log.h"
#include "sync_raft_progress.h"
#include "sync_raft_progress_tracker.h"
#include "sync.h"
#include "syncInt.h"
static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state);
static void probeAcked(SSyncRaftProgress* progress);
static void resumeProgress(SSyncRaftProgress* progress);
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) {
SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflight);
if (inflights == NULL) {
return;
}
*progress = (SSyncRaftProgress) {
.matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0,
.nextIndex = syncRaftLogLastIndex(pRaft->log) + 1,
.inflights = inflights,
.isLearner = false,
.state = PROGRESS_STATE_PROBE,
};
}
/**
* syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the
* index acked by it. The method returns false if the given n index comes from
* an outdated message. Otherwise it updates the progress and returns true.
**/
bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) {
bool updated = false;
if (progress->matchIndex < lastIndex) {
progress->matchIndex = lastIndex;
updated = true;
probeAcked(progress);
}
progress->nextIndex = MAX(progress->nextIndex, lastIndex + 1);
return updated;
}
bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
SyncIndex rejected, SyncIndex matchHint) {
if (progress->state == PROGRESS_STATE_REPLICATE) {
/**
* the rejection must be stale if the progress has matched and "rejected"
* is smaller than "match".
**/
if (rejected <= progress->matchIndex) {
syncDebug("match index is up to date,ignore");
return false;
}
/* directly decrease next to match + 1 */
progress->nextIndex = progress->matchIndex + 1;
return true;
}
/**
* The rejection must be stale if "rejected" does not match next - 1. This
* is because non-replicating followers are probed one entry at a time.
**/
if (rejected != progress->nextIndex - 1) {
syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore"
, rejected, progress->nextIndex);
return false;
}
progress->nextIndex = MAX(MIN(rejected, matchHint + 1), 1);
progress->probeSent = false;
return true;
}
/**
* syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled.
* This is done when a node has rejected recent MsgApps, is currently waiting
* for a snapshot, or has reached the MaxInflightMsgs limit. In normal
* operation, this is false. A throttled node will be contacted less frequently
* until it has reached a state in which it's able to accept a steady stream of
* log entries again.
**/
bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
switch (progress->state) {
case PROGRESS_STATE_PROBE:
return progress->probeSent;
case PROGRESS_STATE_REPLICATE:
return syncRaftInflightFull(progress->inflights);
case PROGRESS_STATE_SNAPSHOT:
return true;
default:
syncFatal("error sync state:%d", progress->state);
}
}
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex;
}
/**
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
* optionally and if larger, the index of the pending snapshot.
**/
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) {
/**
* If the original state is ProgressStateSnapshot, progress knows that
* the pending snapshot has been sent to this peer successfully, then
* probes from pendingSnapshot + 1.
**/
if (progress->state == PROGRESS_STATE_SNAPSHOT) {
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex;
resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = MAX(progress->matchIndex + 1, pendingSnapshotIndex + 1);
} else {
resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = progress->matchIndex + 1;
}
}
/**
* syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
**/
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) {
resetProgressState(progress, PROGRESS_STATE_REPLICATE);
progress->nextIndex = progress->matchIndex + 1;
}
void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) {
resetProgressState(progress, PROGRESS_STATE_SNAPSHOT);
progress->pendingSnapshotIndex = snapshotIndex;
}
/**
* ResetState moves the Progress into the specified State, resetting ProbeSent,
* PendingSnapshot, and Inflights.
**/
static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) {
progress->probeSent = false;
progress->pendingSnapshotIndex = 0;
progress->state = state;
syncRaftInflightReset(progress->inflights);
}
/**
* probeAcked is called when this peer has accepted an append. It resets
* ProbeSent to signal that additional append messages should be sent without
* further delay.
**/
static void probeAcked(SSyncRaftProgress* progress) {
progress->probeSent = false;
}
#if 0
SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].nextIndex;
}
SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].matchIndex;
}
void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft);
}
void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft);
}
bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
bool prev = progress->recentRecv;
progress->recentRecv = false;
return prev;
}
void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].recentRecv = true;
}
bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].recentRecv;
}
void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
resetProgressState(progress, PROGRESS_STATE_SNAPSHOT);
progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log);
}
void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
if (progress->state == PROGRESS_STATE_SNAPSHOT) {
assert(progress->pendingSnapshotIndex > 0);
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex;
resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex);
} else {
resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = progress->matchIndex + 1;
}
}
void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) {
resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE);
pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1;
}
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
progress->pendingSnapshotIndex = 0;
progress->state = PROGRESS_STATE_PROBE;
}
RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].state;
}
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync_raft_progress_tracker.h"
SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker));
if (tracker == NULL) {
return NULL;
}
return tracker;
}
void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteResult) * TSDB_MAX_REPLICA);
}
void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) {
int i;
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncRaftProgress* progress = &(tracker->progressMap[i]);
visit(i, progress, arg);
}
}
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) {
if (tracker->votes[i] != SYNC_RAFT_VOTE_RESP_UNKNOWN) {
return;
}
tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT;
}
/**
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
* election outcome is known.
**/
SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) {
int i;
SSyncRaftProgress* progress;
int r, g;
for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) {
progress = &(tracker->progressMap[i]);
if (progress->id == SYNC_NON_NODE_ID) {
continue;
}
if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) {
continue;
}
if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) {
g++;
} else {
r++;
}
}
if (rejected) *rejected = r;
if (granted) *granted = g;
return syncRaftVoteResult(&(tracker->config.voters), tracker->votes);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync_raft_quorum_majority.h"
#include "sync_raft_quorum_joint.h"
#include "sync_raft_quorum.h"
/**
* syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending, lost, or won. A joint quorum
* requires both majority quorums to vote in favor.
**/
SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes) {
SyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes);
SyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes);
if (r1 == r2) {
// If they agree, return the agreed state.
return r1;
}
if (r1 == SYNC_RAFT_VOTE_LOST || r2 == SYNC_RAFT_VOTE_LOST) {
// If either config has lost, loss is the only possible outcome.
return SYNC_RAFT_VOTE_LOST;
}
// One side won, the other one is pending, so the whole outcome is.
return SYNC_RAFT_VOTE_PENDING;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync_raft_quorum.h"
#include "sync_raft_quorum_majority.h"
/**
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending (i.e. neither a quorum of
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached).
**/
SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes) {
if (config->replica == 0) {
return SYNC_RAFT_VOTE_WON;
}
int i, g, r, missing;
for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) {
if (config->nodeInfo[i].nodeId == SYNC_NON_NODE_ID) {
continue;
}
if (votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) {
missing += 1;
} else if (votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) {
g +=1;
} else {
r += 1;
}
}
int quorum = config->replica / 2 + 1;
if (g >= quorum) {
return SYNC_RAFT_VOTE_WON;
}
if (r + missing >= quorum) {
return SYNC_RAFT_VOTE_PENDING;
}
return SYNC_RAFT_VOTE_LOST;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册