syncInt.h 11.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_INT_H
#define _TD_LIBS_SYNC_INT_H

#ifdef __cplusplus
extern "C" {
#endif

M
Minghao Li 已提交
23
#include "sync.h"
S
Shengliang Guan 已提交
24
#include "taosdef.h"
25
#include "trpc.h"
M
Minghao Li 已提交
26
#include "ttimer.h"
M
Minghao Li 已提交
27

28 29 30 31 32
typedef struct SyncTimeout            SyncTimeout;
typedef struct SyncClientRequest      SyncClientRequest;
typedef struct SyncRequestVote        SyncRequestVote;
typedef struct SyncRequestVoteReply   SyncRequestVoteReply;
typedef struct SyncAppendEntries      SyncAppendEntries;
M
Minghao Li 已提交
33
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
34 35 36 37 38
typedef struct SSyncEnv               SSyncEnv;
typedef struct SVotesGranted          SVotesGranted;
typedef struct SVotesRespond          SVotesRespond;
typedef struct SSyncIndexMgr          SSyncIndexMgr;
typedef struct SSyncRespMgr           SSyncRespMgr;
M
Minghao Li 已提交
39 40
typedef struct SSyncSnapshotSender    SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver  SSyncSnapshotReceiver;
41 42
typedef struct SSyncTimer             SSyncTimer;
typedef struct SSyncHbTimerData       SSyncHbTimerData;
43 44 45 46 47 48 49 50
typedef struct SyncSnapshotSend       SyncSnapshotSend;
typedef struct SyncSnapshotRsp        SyncSnapshotRsp;
typedef struct SyncLocalCmd           SyncLocalCmd;
typedef struct SyncAppendEntriesBatch SyncAppendEntriesBatch;
typedef struct SyncPreSnapshotReply   SyncPreSnapshotReply;
typedef struct SyncHeartbeatReply     SyncHeartbeatReply;
typedef struct SyncHeartbeat          SyncHeartbeat;
typedef struct SyncPreSnapshot        SyncPreSnapshot;
51 52
typedef struct SSyncLogBuffer         SSyncLogBuffer;
typedef struct SSyncLogReplMgr        SSyncLogReplMgr;
53

54 55 56
#define MAX_CONFIG_INDEX_COUNT 256

typedef struct SRaftCfg {
C
cadem 已提交
57 58 59 60 61 62 63
  SSyncCfg          cfg;
  int32_t           batchSize;
  int8_t            isStandBy;
  int8_t            snapshotStrategy;
  SyncIndex         lastConfigIndex;
  int32_t           configIndexCount;
  SyncIndex         configIndexArr[MAX_CONFIG_INDEX_COUNT];
64 65
} SRaftCfg;

66 67 68 69
typedef struct SRaftId {
  SyncNodeId  addr;
  SyncGroupId vgId;
} SRaftId;
70

S
Shengliang Guan 已提交
71 72 73
typedef struct SRaftStore {
  SyncTerm currentTerm;
  SRaftId  voteFor;
74
  TdThreadMutex mutex;
S
Shengliang Guan 已提交
75 76
} SRaftStore;

77
typedef struct SSyncHbTimerData {
78
  int64_t     syncNodeRid;
79 80 81
  SSyncTimer* pTimer;
  SRaftId     destId;
  uint64_t    logicClock;
82
  int64_t     execTime;
83
  int64_t     rid;
84 85 86 87 88 89 90 91
} SSyncHbTimerData;

typedef struct SSyncTimer {
  void*             pTimer;
  TAOS_TMR_CALLBACK timerCb;
  uint64_t          logicClock;
  uint64_t          counter;
  int32_t           timerMS;
M
Minghao Li 已提交
92
  int64_t           timeStamp;
93
  SRaftId           destId;
94
  int64_t           hbDataRid;
95 96
} SSyncTimer;

97
typedef struct SElectTimerParam {
M
Minghao Li 已提交
98 99
  uint64_t   logicClock;
  SSyncNode* pSyncNode;
100
  int64_t    executeTime;
M
Minghao Li 已提交
101
  void*      pData;
102
} SElectTimerParam;
103

M
Minghao Li 已提交
104 105 106 107 108
typedef struct SPeerState {
  SyncIndex lastSendIndex;
  int64_t   lastSendTime;
} SPeerState;

M
Minghao Li 已提交
109
typedef struct SSyncNode {
M
syncInt  
Minghao Li 已提交
110
  // init by SSyncInfo
M
Minghao Li 已提交
111
  SyncGroupId vgId;
112
  SRaftCfg    raftCfg;
M
Minghao Li 已提交
113
  char        path[TSDB_FILENAME_LEN];
M
Minghao Li 已提交
114
  char        raftStorePath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
115
  char        configPath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
116 117

  // sync io
B
Benguang Zhao 已提交
118
  SSyncLogBuffer* pLogBuf;
S
Shengliang Guan 已提交
119 120
  SWal*           pWal;
  const SMsgCb*   msgcb;
S
Shengliang Guan 已提交
121 122 123
  int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
  int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
  int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
M
Minghao Li 已提交
124

M
syncInt  
Minghao Li 已提交
125
  // init internal
M
Minghao Li 已提交
126 127
  SNodeInfo myNodeInfo;
  SRaftId   myRaftId;
M
Minghao Li 已提交
128

M
Minghao Li 已提交
129
  int32_t   peersNum;
C
cadem 已提交
130 131 132
  SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
  SEpSet    peersEpset[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
  SRaftId   peersId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
M
Minghao Li 已提交
133 134

  int32_t replicaNum;
C
cadem 已提交
135 136
  int32_t totalReplicaNum;
  SRaftId replicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
M
Minghao Li 已提交
137

M
syncInt  
Minghao Li 已提交
138 139 140
  // raft algorithm
  SSyncFSM* pFsm;
  int32_t   quorum;
M
Minghao Li 已提交
141
  SRaftId   leaderCache;
M
syncInt  
Minghao Li 已提交
142 143 144 145 146

  // life cycle
  int64_t rid;

  // tla+ server vars
S
Shengliang Guan 已提交
147 148
  ESyncState state;
  SRaftStore raftStore;
M
syncInt  
Minghao Li 已提交
149 150

  // tla+ candidate vars
M
Minghao Li 已提交
151 152
  SVotesGranted* pVotesGranted;
  SVotesRespond* pVotesRespond;
M
Minghao Li 已提交
153

M
syncInt  
Minghao Li 已提交
154
  // tla+ leader vars
M
Minghao Li 已提交
155 156
  SSyncIndexMgr* pNextIndex;
  SSyncIndexMgr* pMatchIndex;
M
syncInt  
Minghao Li 已提交
157 158 159 160 161

  // tla+ log vars
  SSyncLogStore* pLogStore;
  SyncIndex      commitIndex;

M
Minghao Li 已提交
162 163 164 165 166
  // timer ms init
  int32_t pingBaseLine;
  int32_t electBaseLine;
  int32_t hbBaseLine;

M
Minghao Li 已提交
167
  // ping timer
M
Minghao Li 已提交
168 169
  tmr_h             pPingTimer;
  int32_t           pingTimerMS;
M
Minghao Li 已提交
170 171
  uint64_t          pingTimerLogicClock;
  uint64_t          pingTimerLogicClockUser;
M
Minghao Li 已提交
172
  TAOS_TMR_CALLBACK FpPingTimerCB;  // Timer Fp
M
Minghao Li 已提交
173 174
  uint64_t          pingTimerCounter;

M
Minghao Li 已提交
175
  // elect timer
M
Minghao Li 已提交
176 177
  tmr_h             pElectTimer;
  int32_t           electTimerMS;
M
Minghao Li 已提交
178
  uint64_t          electTimerLogicClock;
M
Minghao Li 已提交
179
  TAOS_TMR_CALLBACK FpElectTimerCB;  // Timer Fp
M
Minghao Li 已提交
180
  uint64_t          electTimerCounter;
181
  SElectTimerParam  electTimerParam;
M
Minghao Li 已提交
182

M
Minghao Li 已提交
183
  // heartbeat timer
M
Minghao Li 已提交
184 185
  tmr_h             pHeartbeatTimer;
  int32_t           heartbeatTimerMS;
M
Minghao Li 已提交
186 187
  uint64_t          heartbeatTimerLogicClock;
  uint64_t          heartbeatTimerLogicClockUser;
M
Minghao Li 已提交
188
  TAOS_TMR_CALLBACK FpHeartbeatTimerCB;  // Timer Fp
M
Minghao Li 已提交
189 190
  uint64_t          heartbeatTimerCounter;

191
  // peer heartbeat timer
C
cadem 已提交
192
  SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
193

M
Minghao Li 已提交
194 195 196
  // tools
  SSyncRespMgr* pSyncRespMgr;

197
  // restore state
B
Benguang Zhao 已提交
198
  bool restoreFinish;
199
  // SSnapshot*             pSnapshot;
C
cadem 已提交
200
  SSyncSnapshotSender*   senders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
M
Minghao Li 已提交
201
  SSyncSnapshotReceiver* pNewNodeReceiver;
202

B
Benguang Zhao 已提交
203
  // log replication mgr
C
cadem 已提交
204
  SSyncLogReplMgr* logReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
B
Benguang Zhao 已提交
205

C
cadem 已提交
206
  SPeerState peerStates[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
M
Minghao Li 已提交
207

M
Minghao Li 已提交
208 209
  // is config changing
  bool changing;
210

211
  int64_t snapshottingIndex;
M
Minghao Li 已提交
212 213
  int64_t snapshottingTime;
  int64_t minMatchIndex;
214

M
Minghao Li 已提交
215
  int64_t startTime;
216
  int64_t leaderTime;
M
Minghao Li 已提交
217 218
  int64_t lastReplicateTime;

219 220 221
  int32_t electNum;
  int32_t becomeLeaderNum;
  int32_t configChangeNum;
222 223
  int32_t hbSlowNum;
  int32_t hbrSlowNum;
M
Minghao Li 已提交
224
  int32_t tmrRoutineNum;
225

226 227
  bool isStart;

M
Minghao Li 已提交
228 229
} SSyncNode;

M
Minghao Li 已提交
230
// open/close --------------
231
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
B
Benguang Zhao 已提交
232 233
int32_t    syncNodeStart(SSyncNode* pSyncNode);
int32_t    syncNodeStartStandBy(SSyncNode* pSyncNode);
M
syncInt  
Minghao Li 已提交
234
void       syncNodeClose(SSyncNode* pSyncNode);
M
Minghao Li 已提交
235
void       syncNodePreClose(SSyncNode* pSyncNode);
236
void       syncNodePostClose(SSyncNode* pSyncNode);
S
Shengliang Guan 已提交
237
int32_t    syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
B
Benguang Zhao 已提交
238
int32_t    syncNodeRestore(SSyncNode* pSyncNode);
239
void       syncHbTimerDataFree(SSyncHbTimerData* pData);
M
Minghao Li 已提交
240

S
Shengliang Guan 已提交
241 242 243 244 245 246 247 248
// on message ---------------------
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
S
Shengliang Guan 已提交
249
int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg);
S
Shengliang Guan 已提交
250 251 252
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
M
Minghao Li 已提交
253

M
Minghao Li 已提交
254
// timer control --------------
M
Minghao Li 已提交
255 256
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
257
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
M
Minghao Li 已提交
258
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
259
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
260
void    syncNodeResetElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
261 262
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
263
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
264 265

// utils --------------
M
Minghao Li 已提交
266 267
int32_t   syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
268
int32_t   syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
269
bool      syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode);
270 271
bool      syncNodeSnapshotSending(SSyncNode* pSyncNode);
bool      syncNodeSnapshotRecving(SSyncNode* pSyncNode);
272
bool      syncNodeIsReadyForRead(SSyncNode* pSyncNode);
M
Minghao Li 已提交
273

M
Minghao Li 已提交
274 275
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
276
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
M
Minghao Li 已提交
277
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
278
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
C
cadem 已提交
279
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr);
280
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
M
Minghao Li 已提交
281 282 283 284 285 286 287
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);

// raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
288
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm term);
M
Minghao Li 已提交
289

B
Benguang Zhao 已提交
290 291 292
// log replication
SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId);

M
Minghao Li 已提交
293
// snapshot --------------
M
Minghao Li 已提交
294 295 296
bool    syncNodeHasSnapshot(SSyncNode* pSyncNode);
void    syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId);
M
Minghao Li 已提交
297

298
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
M
Minghao Li 已提交
299 300
SyncTerm  syncNodeGetLastTerm(SSyncNode* pSyncNode);
int32_t   syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
M
Minghao Li 已提交
301
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode);
M
Minghao Li 已提交
302
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
M
Minghao Li 已提交
303
SyncTerm  syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
M
Minghao Li 已提交
304
int32_t   syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
M
Minghao Li 已提交
305

M
Minghao Li 已提交
306
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
M
Minghao Li 已提交
307
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex);
308
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
309

M
Minghao Li 已提交
310 311
bool                 syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
312
SSyncTimer*          syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId);
M
Minghao Li 已提交
313 314
SPeerState*          syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId);
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg);
315

M
Minghao Li 已提交
316 317
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
318

319
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);
M
Minghao Li 已提交
320 321
bool    syncNodeIsMnode(SSyncNode* pSyncNode);
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
M
Minghao Li 已提交
322

M
Minghao Li 已提交
323 324 325 326 327
#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_INT_H*/