syncInt.h 11.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_INT_H
#define _TD_LIBS_SYNC_INT_H

#ifdef __cplusplus
extern "C" {
#endif

M
Minghao Li 已提交
23
#include "sync.h"
S
Shengliang Guan 已提交
24
#include "taosdef.h"
25
#include "trpc.h"
M
Minghao Li 已提交
26
#include "ttimer.h"
M
Minghao Li 已提交
27

28 29 30 31 32
typedef struct SyncTimeout            SyncTimeout;
typedef struct SyncClientRequest      SyncClientRequest;
typedef struct SyncRequestVote        SyncRequestVote;
typedef struct SyncRequestVoteReply   SyncRequestVoteReply;
typedef struct SyncAppendEntries      SyncAppendEntries;
M
Minghao Li 已提交
33
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
34 35 36 37 38 39 40
typedef struct SSyncEnv               SSyncEnv;
typedef struct SRaftStore             SRaftStore;
typedef struct SVotesGranted          SVotesGranted;
typedef struct SVotesRespond          SVotesRespond;
typedef struct SSyncIndexMgr          SSyncIndexMgr;
typedef struct SRaftCfg               SRaftCfg;
typedef struct SSyncRespMgr           SSyncRespMgr;
M
Minghao Li 已提交
41 42
typedef struct SSyncSnapshotSender    SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver  SSyncSnapshotReceiver;
43 44
typedef struct SSyncTimer             SSyncTimer;
typedef struct SSyncHbTimerData       SSyncHbTimerData;
45 46 47 48 49 50 51 52
typedef struct SyncSnapshotSend       SyncSnapshotSend;
typedef struct SyncSnapshotRsp        SyncSnapshotRsp;
typedef struct SyncLocalCmd           SyncLocalCmd;
typedef struct SyncAppendEntriesBatch SyncAppendEntriesBatch;
typedef struct SyncPreSnapshotReply   SyncPreSnapshotReply;
typedef struct SyncHeartbeatReply     SyncHeartbeatReply;
typedef struct SyncHeartbeat          SyncHeartbeat;
typedef struct SyncPreSnapshot        SyncPreSnapshot;
53 54
typedef struct SSyncLogBuffer         SSyncLogBuffer;
typedef struct SSyncLogReplMgr        SSyncLogReplMgr;
55

56 57 58 59 60 61 62 63 64 65 66 67
#define MAX_CONFIG_INDEX_COUNT 256

typedef struct SRaftCfg {
  SSyncCfg  cfg;
  int32_t   batchSize;
  int8_t    isStandBy;
  int8_t    snapshotStrategy;
  SyncIndex lastConfigIndex;
  int32_t   configIndexCount;
  SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
} SRaftCfg;

68 69 70 71
typedef struct SRaftId {
  SyncNodeId  addr;
  SyncGroupId vgId;
} SRaftId;
72

73
typedef struct SSyncHbTimerData {
74
  int64_t     syncNodeRid;
75 76 77
  SSyncTimer* pTimer;
  SRaftId     destId;
  uint64_t    logicClock;
78
  int64_t     execTime;
79
  int64_t     rid;
80 81 82 83 84 85 86 87
} SSyncHbTimerData;

typedef struct SSyncTimer {
  void*             pTimer;
  TAOS_TMR_CALLBACK timerCb;
  uint64_t          logicClock;
  uint64_t          counter;
  int32_t           timerMS;
M
Minghao Li 已提交
88
  int64_t           timeStamp;
89
  SRaftId           destId;
90
  int64_t           hbDataRid;
91 92
} SSyncTimer;

93
typedef struct SElectTimerParam {
M
Minghao Li 已提交
94 95
  uint64_t   logicClock;
  SSyncNode* pSyncNode;
96
  int64_t    executeTime;
M
Minghao Li 已提交
97
  void*      pData;
98
} SElectTimerParam;
99

M
Minghao Li 已提交
100 101 102 103 104
typedef struct SPeerState {
  SyncIndex lastSendIndex;
  int64_t   lastSendTime;
} SPeerState;

M
Minghao Li 已提交
105
typedef struct SSyncNode {
M
syncInt  
Minghao Li 已提交
106
  // init by SSyncInfo
M
Minghao Li 已提交
107
  SyncGroupId vgId;
108
  SRaftCfg    raftCfg;
M
Minghao Li 已提交
109
  char        path[TSDB_FILENAME_LEN];
M
Minghao Li 已提交
110
  char        raftStorePath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
111
  char        configPath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
112 113

  // sync io
B
Benguang Zhao 已提交
114
  SSyncLogBuffer* pLogBuf;
S
Shengliang Guan 已提交
115 116
  SWal*         pWal;
  const SMsgCb* msgcb;
S
Shengliang Guan 已提交
117 118 119
  int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
  int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
  int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
M
Minghao Li 已提交
120

M
syncInt  
Minghao Li 已提交
121
  // init internal
M
Minghao Li 已提交
122 123
  SNodeInfo myNodeInfo;
  SRaftId   myRaftId;
M
Minghao Li 已提交
124

M
Minghao Li 已提交
125
  int32_t   peersNum;
M
Minghao Li 已提交
126
  SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA];
127
  SEpSet    peersEpset[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
128 129 130 131
  SRaftId   peersId[TSDB_MAX_REPLICA];

  int32_t replicaNum;
  SRaftId replicasId[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
132

M
syncInt  
Minghao Li 已提交
133 134 135
  // raft algorithm
  SSyncFSM* pFsm;
  int32_t   quorum;
M
Minghao Li 已提交
136
  SRaftId   leaderCache;
M
syncInt  
Minghao Li 已提交
137 138 139 140 141 142 143 144 145

  // life cycle
  int64_t rid;

  // tla+ server vars
  ESyncState  state;
  SRaftStore* pRaftStore;

  // tla+ candidate vars
M
Minghao Li 已提交
146 147
  SVotesGranted* pVotesGranted;
  SVotesRespond* pVotesRespond;
M
Minghao Li 已提交
148

M
syncInt  
Minghao Li 已提交
149
  // tla+ leader vars
M
Minghao Li 已提交
150 151
  SSyncIndexMgr* pNextIndex;
  SSyncIndexMgr* pMatchIndex;
M
syncInt  
Minghao Li 已提交
152 153 154 155 156

  // tla+ log vars
  SSyncLogStore* pLogStore;
  SyncIndex      commitIndex;

M
Minghao Li 已提交
157 158 159 160 161
  // timer ms init
  int32_t pingBaseLine;
  int32_t electBaseLine;
  int32_t hbBaseLine;

M
Minghao Li 已提交
162
  // ping timer
M
Minghao Li 已提交
163 164
  tmr_h             pPingTimer;
  int32_t           pingTimerMS;
M
Minghao Li 已提交
165 166
  uint64_t          pingTimerLogicClock;
  uint64_t          pingTimerLogicClockUser;
M
Minghao Li 已提交
167
  TAOS_TMR_CALLBACK FpPingTimerCB;  // Timer Fp
M
Minghao Li 已提交
168 169
  uint64_t          pingTimerCounter;

M
Minghao Li 已提交
170
  // elect timer
M
Minghao Li 已提交
171 172
  tmr_h             pElectTimer;
  int32_t           electTimerMS;
M
Minghao Li 已提交
173
  uint64_t          electTimerLogicClock;
M
Minghao Li 已提交
174
  TAOS_TMR_CALLBACK FpElectTimerCB;  // Timer Fp
M
Minghao Li 已提交
175
  uint64_t          electTimerCounter;
176
  SElectTimerParam  electTimerParam;
M
Minghao Li 已提交
177

M
Minghao Li 已提交
178
  // heartbeat timer
M
Minghao Li 已提交
179 180
  tmr_h             pHeartbeatTimer;
  int32_t           heartbeatTimerMS;
M
Minghao Li 已提交
181 182
  uint64_t          heartbeatTimerLogicClock;
  uint64_t          heartbeatTimerLogicClockUser;
M
Minghao Li 已提交
183
  TAOS_TMR_CALLBACK FpHeartbeatTimerCB;  // Timer Fp
M
Minghao Li 已提交
184 185
  uint64_t          heartbeatTimerCounter;

186 187 188
  // peer heartbeat timer
  SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA];

M
Minghao Li 已提交
189 190 191
  // tools
  SSyncRespMgr* pSyncRespMgr;

192
  // restore state
B
Benguang Zhao 已提交
193
  bool restoreFinish;
194
  // SSnapshot*             pSnapshot;
M
Minghao Li 已提交
195
  SSyncSnapshotSender*   senders[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
196
  SSyncSnapshotReceiver* pNewNodeReceiver;
197

B
Benguang Zhao 已提交
198 199 200
  // log replication mgr
  SSyncLogReplMgr* logReplMgrs[TSDB_MAX_REPLICA];

M
Minghao Li 已提交
201 202
  SPeerState peerStates[TSDB_MAX_REPLICA];

M
Minghao Li 已提交
203 204
  // is config changing
  bool changing;
205

206
  int64_t snapshottingIndex;
M
Minghao Li 已提交
207 208
  int64_t snapshottingTime;
  int64_t minMatchIndex;
209

M
Minghao Li 已提交
210
  int64_t startTime;
211
  int64_t leaderTime;
M
Minghao Li 已提交
212 213
  int64_t lastReplicateTime;

214 215 216
  int32_t electNum;
  int32_t becomeLeaderNum;
  int32_t configChangeNum;
217 218
  int32_t hbSlowNum;
  int32_t hbrSlowNum;
M
Minghao Li 已提交
219
  int32_t tmrRoutineNum;
220

221 222
  bool isStart;

M
Minghao Li 已提交
223 224
} SSyncNode;

M
Minghao Li 已提交
225
// open/close --------------
226
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
B
Benguang Zhao 已提交
227 228
int32_t    syncNodeStart(SSyncNode* pSyncNode);
int32_t    syncNodeStartStandBy(SSyncNode* pSyncNode);
M
syncInt  
Minghao Li 已提交
229
void       syncNodeClose(SSyncNode* pSyncNode);
M
Minghao Li 已提交
230
void       syncNodePreClose(SSyncNode* pSyncNode);
S
Shengliang Guan 已提交
231
int32_t    syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq);
B
Benguang Zhao 已提交
232
int32_t    syncNodeRestore(SSyncNode* pSyncNode);
233
void       syncHbTimerDataFree(SSyncHbTimerData* pData);
M
Minghao Li 已提交
234

S
Shengliang Guan 已提交
235 236 237 238 239 240 241 242
// on message ---------------------
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
S
Shengliang Guan 已提交
243
int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg);
S
Shengliang Guan 已提交
244 245 246
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
M
Minghao Li 已提交
247

M
Minghao Li 已提交
248
// timer control --------------
M
Minghao Li 已提交
249 250
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
251
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
M
Minghao Li 已提交
252
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
253
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
254
void    syncNodeResetElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
255 256
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
257
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
258 259

// utils --------------
M
Minghao Li 已提交
260 261
int32_t   syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
262
int32_t   syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
263
bool      syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode);
264 265
bool      syncNodeSnapshotSending(SSyncNode* pSyncNode);
bool      syncNodeSnapshotRecving(SSyncNode* pSyncNode);
266
bool      syncNodeIsReadyForRead(SSyncNode* pSyncNode);
M
Minghao Li 已提交
267

M
Minghao Li 已提交
268 269
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
270
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
M
Minghao Li 已提交
271
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
272 273
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
M
Minghao Li 已提交
274 275 276 277 278 279 280 281 282
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);

// raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);

B
Benguang Zhao 已提交
283 284 285
// log replication
SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId);

M
Minghao Li 已提交
286
// snapshot --------------
M
Minghao Li 已提交
287 288 289
bool    syncNodeHasSnapshot(SSyncNode* pSyncNode);
void    syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId);
M
Minghao Li 已提交
290

291
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
M
Minghao Li 已提交
292 293
SyncTerm  syncNodeGetLastTerm(SSyncNode* pSyncNode);
int32_t   syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
M
Minghao Li 已提交
294
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode);
M
Minghao Li 已提交
295
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
M
Minghao Li 已提交
296
SyncTerm  syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
M
Minghao Li 已提交
297
int32_t   syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
M
Minghao Li 已提交
298

M
Minghao Li 已提交
299
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
M
Minghao Li 已提交
300
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex);
301
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
302

M
Minghao Li 已提交
303 304
bool                 syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
305
SSyncTimer*          syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId);
M
Minghao Li 已提交
306 307
SPeerState*          syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId);
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg);
308

M
Minghao Li 已提交
309 310
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
311

312
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);
M
Minghao Li 已提交
313 314
bool    syncNodeIsMnode(SSyncNode* pSyncNode);
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
M
Minghao Li 已提交
315

M
Minghao Li 已提交
316 317 318 319 320
#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_INT_H*/