syncInt.h 18.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_INT_H
#define _TD_LIBS_SYNC_INT_H

#ifdef __cplusplus
extern "C" {
#endif

M
Minghao Li 已提交
23
#include "sync.h"
M
Minghao Li 已提交
24
#include "syncTools.h"
M
Minghao Li 已提交
25
#include "tlog.h"
M
Minghao Li 已提交
26
#include "ttimer.h"
M
Minghao Li 已提交
27

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
// clang-format off
#define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define sError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define sWarn(...)  do { if (sDebugFlag & DEBUG_WARN)  { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define sInfo(...)  do { if (sDebugFlag & DEBUG_INFO)  { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
#define sDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }}    while(0)
#define sTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }}    while(0)
#define sFatalLong(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define sErrorLong(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define sWarnLong(...)  do { if (sDebugFlag & DEBUG_WARN)  { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define sInfoLong(...)  do { if (sDebugFlag & DEBUG_INFO)  { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
#define sDebugLong(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }}    while(0)
#define sTraceLong(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }}    while(0)
// clang-format on

typedef struct SyncTimeout            SyncTimeout;
typedef struct SyncClientRequest      SyncClientRequest;
typedef struct SyncPing               SyncPing;
typedef struct SyncPingReply          SyncPingReply;
typedef struct SyncRequestVote        SyncRequestVote;
typedef struct SyncRequestVoteReply   SyncRequestVoteReply;
typedef struct SyncAppendEntries      SyncAppendEntries;
M
Minghao Li 已提交
50
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
51 52 53 54 55 56 57
typedef struct SSyncEnv               SSyncEnv;
typedef struct SRaftStore             SRaftStore;
typedef struct SVotesGranted          SVotesGranted;
typedef struct SVotesRespond          SVotesRespond;
typedef struct SSyncIndexMgr          SSyncIndexMgr;
typedef struct SRaftCfg               SRaftCfg;
typedef struct SSyncRespMgr           SSyncRespMgr;
M
Minghao Li 已提交
58 59
typedef struct SSyncSnapshotSender    SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver  SSyncSnapshotReceiver;
60 61
typedef struct SSyncTimer             SSyncTimer;
typedef struct SSyncHbTimerData       SSyncHbTimerData;
M
Minghao Li 已提交
62

63 64
extern bool gRaftDetailLog;

65 66 67 68 69 70 71 72 73 74 75 76 77 78
typedef struct SSyncHbTimerData {
  SSyncNode*  pSyncNode;
  SSyncTimer* pTimer;
  SRaftId     destId;
  uint64_t    logicClock;
} SSyncHbTimerData;

typedef struct SSyncTimer {
  void*             pTimer;
  TAOS_TMR_CALLBACK timerCb;
  uint64_t          logicClock;
  uint64_t          counter;
  int32_t           timerMS;
  SRaftId           destId;
M
Minghao Li 已提交
79
  void*             pData;
80 81 82 83 84 85
} SSyncTimer;

int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);

M
Minghao Li 已提交
86 87 88 89 90
typedef struct SPeerState {
  SyncIndex lastSendIndex;
  int64_t   lastSendTime;
} SPeerState;

B
Benguang Zhao 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
typedef struct SSyncReplInfo {
  bool    barrier;
  bool    acked;
  int64_t timeMs;
  int64_t term;
} SSyncReplInfo;

typedef struct SSyncLogReplMgr {
  SSyncReplInfo states[TSDB_SYNC_LOG_BUFFER_SIZE];
  int64_t       startIndex;
  int64_t       matchIndex;
  int64_t       endIndex;
  int64_t       size;
  bool          restored;
  int64_t       peerStartTime;
  int32_t       retryBackoff;
  int32_t       peerId;
} SSyncLogReplMgr;

SSyncLogReplMgr* syncLogReplMgrCreate();
void             syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr);

// access
static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) {
  return (1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
}

static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
  return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF);
}

static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, SyncIndex index, SyncTerm term) {
123 124
  if (pMgr->endIndex == 0) return -1;
  ASSERT(pMgr->startIndex <= index && index < pMgr->endIndex);
B
Benguang Zhao 已提交
125 126 127 128 129
  pMgr->states[(index + pMgr->size) % pMgr->size].term = term;
  return 0;
}

SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
130 131
int32_t  syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
                                     SRaftId* pDestId, bool* pBarrier);
B
Benguang Zhao 已提交
132 133 134 135 136 137 138 139 140 141 142 143
int32_t  syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t  syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t  syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t  syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t  syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr);
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);

// others
bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr);

B
Benguang Zhao 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
typedef struct SSyncLogBufEntry {
  SSyncRaftEntry* pItem;
  SyncIndex       prevLogIndex;
  SyncTerm        prevLogTerm;
} SSyncLogBufEntry;

typedef struct SSyncLogBuffer {
  SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE];
  int64_t          startIndex;
  int64_t          commitIndex;
  int64_t          matchIndex;
  int64_t          endIndex;
  int64_t          size;
  TdThreadMutex    mutex;
} SSyncLogBuffer;

SSyncLogBuffer* syncLogBufferCreate();
void            syncLogBufferDestroy(SSyncLogBuffer* pBuf);
int32_t         syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);

// access
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex);

int64_t            syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex);
B
Benguang Zhao 已提交
172
SyncAppendEntries* syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm);
B
Benguang Zhao 已提交
173 174

// private
175
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf);
B
Benguang Zhao 已提交
176 177
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex);
B
Benguang Zhao 已提交
178
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm);
B
Benguang Zhao 已提交
179 180
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);

181 182
void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index);

M
Minghao Li 已提交
183
typedef struct SSyncNode {
M
syncInt  
Minghao Li 已提交
184
  // init by SSyncInfo
M
Minghao Li 已提交
185
  SyncGroupId vgId;
M
Minghao Li 已提交
186
  SRaftCfg*   pRaftCfg;
M
Minghao Li 已提交
187
  char        path[TSDB_FILENAME_LEN];
M
Minghao Li 已提交
188
  char        raftStorePath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
189
  char        configPath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
190 191

  // sync io
B
Benguang Zhao 已提交
192
  SSyncLogBuffer* pLogBuf;
S
Shengliang Guan 已提交
193 194 195 196
  SWal*         pWal;
  const SMsgCb* msgcb;
  int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
  int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
197
  int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
M
Minghao Li 已提交
198

M
syncInt  
Minghao Li 已提交
199
  // init internal
M
Minghao Li 已提交
200 201
  SNodeInfo myNodeInfo;
  SRaftId   myRaftId;
M
Minghao Li 已提交
202

M
Minghao Li 已提交
203
  int32_t   peersNum;
M
Minghao Li 已提交
204
  SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
205 206 207 208
  SRaftId   peersId[TSDB_MAX_REPLICA];

  int32_t replicaNum;
  SRaftId replicasId[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
209

M
syncInt  
Minghao Li 已提交
210 211 212
  // raft algorithm
  SSyncFSM* pFsm;
  int32_t   quorum;
M
Minghao Li 已提交
213
  SRaftId   leaderCache;
M
syncInt  
Minghao Li 已提交
214 215 216 217 218 219 220 221 222

  // life cycle
  int64_t rid;

  // tla+ server vars
  ESyncState  state;
  SRaftStore* pRaftStore;

  // tla+ candidate vars
M
Minghao Li 已提交
223 224
  SVotesGranted* pVotesGranted;
  SVotesRespond* pVotesRespond;
M
Minghao Li 已提交
225

M
syncInt  
Minghao Li 已提交
226
  // tla+ leader vars
M
Minghao Li 已提交
227 228
  SSyncIndexMgr* pNextIndex;
  SSyncIndexMgr* pMatchIndex;
M
syncInt  
Minghao Li 已提交
229 230 231 232 233

  // tla+ log vars
  SSyncLogStore* pLogStore;
  SyncIndex      commitIndex;

M
Minghao Li 已提交
234 235 236 237 238
  // timer ms init
  int32_t pingBaseLine;
  int32_t electBaseLine;
  int32_t hbBaseLine;

M
Minghao Li 已提交
239
  // ping timer
M
Minghao Li 已提交
240 241
  tmr_h             pPingTimer;
  int32_t           pingTimerMS;
M
Minghao Li 已提交
242 243
  uint64_t          pingTimerLogicClock;
  uint64_t          pingTimerLogicClockUser;
M
Minghao Li 已提交
244
  TAOS_TMR_CALLBACK FpPingTimerCB;  // Timer Fp
M
Minghao Li 已提交
245 246
  uint64_t          pingTimerCounter;

M
Minghao Li 已提交
247
  // elect timer
M
Minghao Li 已提交
248 249
  tmr_h             pElectTimer;
  int32_t           electTimerMS;
M
Minghao Li 已提交
250 251
  uint64_t          electTimerLogicClock;
  uint64_t          electTimerLogicClockUser;
M
Minghao Li 已提交
252
  TAOS_TMR_CALLBACK FpElectTimerCB;  // Timer Fp
M
Minghao Li 已提交
253 254
  uint64_t          electTimerCounter;

M
Minghao Li 已提交
255
  // heartbeat timer
M
Minghao Li 已提交
256 257
  tmr_h             pHeartbeatTimer;
  int32_t           heartbeatTimerMS;
M
Minghao Li 已提交
258 259
  uint64_t          heartbeatTimerLogicClock;
  uint64_t          heartbeatTimerLogicClockUser;
M
Minghao Li 已提交
260
  TAOS_TMR_CALLBACK FpHeartbeatTimerCB;  // Timer Fp
M
Minghao Li 已提交
261 262
  uint64_t          heartbeatTimerCounter;

263 264 265
  // peer heartbeat timer
  SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA];

M
Minghao Li 已提交
266
  // callback
M
Minghao Li 已提交
267 268 269 270 271 272 273 274
  FpOnPingCb               FpOnPing;
  FpOnPingReplyCb          FpOnPingReply;
  FpOnClientRequestCb      FpOnClientRequest;
  FpOnTimeoutCb            FpOnTimeout;
  FpOnRequestVoteCb        FpOnRequestVote;
  FpOnRequestVoteReplyCb   FpOnRequestVoteReply;
  FpOnAppendEntriesCb      FpOnAppendEntries;
  FpOnAppendEntriesReplyCb FpOnAppendEntriesReply;
M
Minghao Li 已提交
275 276
  FpOnSnapshotCb           FpOnSnapshot;
  FpOnSnapshotReplyCb      FpOnSnapshotReply;
M
Minghao Li 已提交
277

M
Minghao Li 已提交
278 279 280
  // tools
  SSyncRespMgr* pSyncRespMgr;

281
  // restore state
B
Benguang Zhao 已提交
282
  bool restoreFinish;
283
  // SSnapshot*             pSnapshot;
M
Minghao Li 已提交
284
  SSyncSnapshotSender*   senders[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
285
  SSyncSnapshotReceiver* pNewNodeReceiver;
286

B
Benguang Zhao 已提交
287 288 289
  // log replication mgr
  SSyncLogReplMgr* logReplMgrs[TSDB_MAX_REPLICA];

M
Minghao Li 已提交
290 291
  SPeerState peerStates[TSDB_MAX_REPLICA];

M
Minghao Li 已提交
292 293
  // is config changing
  bool changing;
294

295
  int64_t snapshottingIndex;
M
Minghao Li 已提交
296 297
  int64_t snapshottingTime;
  int64_t minMatchIndex;
298

M
Minghao Li 已提交
299
  int64_t startTime;
300
  int64_t leaderTime;
M
Minghao Li 已提交
301 302
  int64_t lastReplicateTime;

M
Minghao Li 已提交
303 304
} SSyncNode;

M
Minghao Li 已提交
305
// open/close --------------
306
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
B
Benguang Zhao 已提交
307 308
int32_t    syncNodeStart(SSyncNode* pSyncNode);
int32_t    syncNodeStartStandBy(SSyncNode* pSyncNode);
M
syncInt  
Minghao Li 已提交
309
void       syncNodeClose(SSyncNode* pSyncNode);
310
int32_t    syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
B
Benguang Zhao 已提交
311
int32_t    syncNodeRestore(SSyncNode* pSyncNode);
M
Minghao Li 已提交
312

M
Minghao Li 已提交
313
// option
M
Minghao Li 已提交
314 315 316
bool          syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
SyncIndex     syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
M
Minghao Li 已提交
317

M
Minghao Li 已提交
318
// ping --------------
M
Minghao Li 已提交
319
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
M
Minghao Li 已提交
320
int32_t syncNodePingSelf(SSyncNode* pSyncNode);
M
Minghao Li 已提交
321 322
int32_t syncNodePingPeers(SSyncNode* pSyncNode);
int32_t syncNodePingAll(SSyncNode* pSyncNode);
M
Minghao Li 已提交
323

M
Minghao Li 已提交
324
// timer control --------------
M
Minghao Li 已提交
325 326
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
327

M
Minghao Li 已提交
328
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
M
Minghao Li 已提交
329
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
330
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
M
Minghao Li 已提交
331
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
332

M
Minghao Li 已提交
333 334
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
335
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
336 337

// utils --------------
M
Minghao Li 已提交
338 339 340 341 342 343 344 345 346 347
int32_t   syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t   syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
cJSON*    syncNode2Json(const SSyncNode* pSyncNode);
char*     syncNode2Str(const SSyncNode* pSyncNode);
void      syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
void      syncNodeErrorLog(const SSyncNode* pSyncNode, char* str);
char*     syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool      syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void      syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
348
char*     syncNodePeerState2Str(const SSyncNode* pSyncNode);
M
Minghao Li 已提交
349

M
Minghao Li 已提交
350 351
SSyncNode* syncNodeAcquire(int64_t rid);
void       syncNodeRelease(SSyncNode* pNode);
M
Minghao Li 已提交
352

M
Minghao Li 已提交
353 354
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
355
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
M
Minghao Li 已提交
356
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
357 358
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
M
Minghao Li 已提交
359 360 361 362 363 364 365 366 367 368

void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);

// raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);

B
Benguang Zhao 已提交
369 370 371
// log replication
SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId);

M
Minghao Li 已提交
372
// snapshot --------------
M
Minghao Li 已提交
373
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
M
Minghao Li 已提交
374
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
M
Minghao Li 已提交
375

376
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
M
Minghao Li 已提交
377 378
SyncTerm  syncNodeGetLastTerm(SSyncNode* pSyncNode);
int32_t   syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
M
Minghao Li 已提交
379
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode);
M
Minghao Li 已提交
380
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
M
Minghao Li 已提交
381
SyncTerm  syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
M
Minghao Li 已提交
382
int32_t   syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
M
Minghao Li 已提交
383

384
bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
M
Minghao Li 已提交
385
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
M
Minghao Li 已提交
386
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex);
387
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
388

389 390
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);

M
Minghao Li 已提交
391 392
bool                 syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
393
SSyncTimer*          syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId);
M
Minghao Li 已提交
394 395
SPeerState*          syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId);
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg);
396

M
Minghao Li 已提交
397 398
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
399

M
Minghao Li 已提交
400
void syncStartNormal(int64_t rid);
B
Benguang Zhao 已提交
401
int32_t syncStartStandBy(int64_t rid);
M
Minghao Li 已提交
402

M
Minghao Li 已提交
403 404 405
bool syncNodeCanChange(SSyncNode* pSyncNode);
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);

M
Minghao Li 已提交
406 407
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
408
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
M
Minghao Li 已提交
409

410 411
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);

M
Minghao Li 已提交
412 413
bool    syncNodeIsMnode(SSyncNode* pSyncNode);
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
M
Minghao Li 已提交
414

M
Minghao Li 已提交
415
// trace log
416 417
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);

M
Minghao Li 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);

void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);

void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);

void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);

void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);

433 434 435 436 437 438
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);

M
Minghao Li 已提交
439 440 441 442 443
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
void syncNodeLog(SSyncNode* pObj);
void syncNodeLog2(char* s, SSyncNode* pObj);
M
Minghao Li 已提交
444
void syncNodeLog3(char* s, SSyncNode* pObj);
M
Minghao Li 已提交
445

M
Minghao Li 已提交
446 447 448 449 450
#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_INT_H*/