syncInt.h 11.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_INT_H
#define _TD_LIBS_SYNC_INT_H

#ifdef __cplusplus
extern "C" {
#endif

M
Minghao Li 已提交
23
#include "sync.h"
M
Minghao Li 已提交
24
#include "syncTools.h"
M
Minghao Li 已提交
25
#include "tlog.h"
M
Minghao Li 已提交
26
#include "ttimer.h"
M
Minghao Li 已提交
27

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
// clang-format off
#define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define sError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define sWarn(...)  do { if (sDebugFlag & DEBUG_WARN)  { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define sInfo(...)  do { if (sDebugFlag & DEBUG_INFO)  { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
#define sDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }}    while(0)
#define sTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }}    while(0)
#define sFatalLong(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define sErrorLong(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define sWarnLong(...)  do { if (sDebugFlag & DEBUG_WARN)  { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define sInfoLong(...)  do { if (sDebugFlag & DEBUG_INFO)  { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
#define sDebugLong(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }}    while(0)
#define sTraceLong(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }}    while(0)
// clang-format on

typedef struct SyncTimeout            SyncTimeout;
typedef struct SyncClientRequest      SyncClientRequest;
typedef struct SyncPing               SyncPing;
typedef struct SyncPingReply          SyncPingReply;
typedef struct SyncRequestVote        SyncRequestVote;
typedef struct SyncRequestVoteReply   SyncRequestVoteReply;
typedef struct SyncAppendEntries      SyncAppendEntries;
M
Minghao Li 已提交
50
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
51 52 53 54 55 56 57
typedef struct SSyncEnv               SSyncEnv;
typedef struct SRaftStore             SRaftStore;
typedef struct SVotesGranted          SVotesGranted;
typedef struct SVotesRespond          SVotesRespond;
typedef struct SSyncIndexMgr          SSyncIndexMgr;
typedef struct SRaftCfg               SRaftCfg;
typedef struct SSyncRespMgr           SSyncRespMgr;
M
Minghao Li 已提交
58 59
typedef struct SSyncSnapshotSender    SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver  SSyncSnapshotReceiver;
M
Minghao Li 已提交
60

61 62
extern bool gRaftDetailLog;

M
Minghao Li 已提交
63
typedef struct SSyncNode {
M
syncInt  
Minghao Li 已提交
64
  // init by SSyncInfo
M
Minghao Li 已提交
65
  SyncGroupId vgId;
M
Minghao Li 已提交
66
  SRaftCfg*   pRaftCfg;
M
Minghao Li 已提交
67
  char        path[TSDB_FILENAME_LEN];
M
Minghao Li 已提交
68
  char        raftStorePath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
69
  char        configPath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
70 71

  // sync io
S
Shengliang Guan 已提交
72 73 74 75
  SWal*         pWal;
  const SMsgCb* msgcb;
  int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
  int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
M
Minghao Li 已提交
76

M
syncInt  
Minghao Li 已提交
77
  // init internal
M
Minghao Li 已提交
78 79
  SNodeInfo myNodeInfo;
  SRaftId   myRaftId;
M
Minghao Li 已提交
80

M
Minghao Li 已提交
81
  int32_t   peersNum;
M
Minghao Li 已提交
82
  SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
83 84 85 86
  SRaftId   peersId[TSDB_MAX_REPLICA];

  int32_t replicaNum;
  SRaftId replicasId[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
87

M
syncInt  
Minghao Li 已提交
88 89 90
  // raft algorithm
  SSyncFSM* pFsm;
  int32_t   quorum;
M
Minghao Li 已提交
91
  SRaftId   leaderCache;
M
syncInt  
Minghao Li 已提交
92 93 94 95 96 97 98 99 100

  // life cycle
  int64_t rid;

  // tla+ server vars
  ESyncState  state;
  SRaftStore* pRaftStore;

  // tla+ candidate vars
M
Minghao Li 已提交
101 102
  SVotesGranted* pVotesGranted;
  SVotesRespond* pVotesRespond;
M
Minghao Li 已提交
103

M
syncInt  
Minghao Li 已提交
104
  // tla+ leader vars
M
Minghao Li 已提交
105 106
  SSyncIndexMgr* pNextIndex;
  SSyncIndexMgr* pMatchIndex;
M
syncInt  
Minghao Li 已提交
107 108 109 110 111

  // tla+ log vars
  SSyncLogStore* pLogStore;
  SyncIndex      commitIndex;

M
Minghao Li 已提交
112 113 114 115 116
  // timer ms init
  int32_t pingBaseLine;
  int32_t electBaseLine;
  int32_t hbBaseLine;

M
Minghao Li 已提交
117
  // ping timer
M
Minghao Li 已提交
118 119
  tmr_h             pPingTimer;
  int32_t           pingTimerMS;
M
Minghao Li 已提交
120 121
  uint64_t          pingTimerLogicClock;
  uint64_t          pingTimerLogicClockUser;
M
Minghao Li 已提交
122
  TAOS_TMR_CALLBACK FpPingTimerCB;  // Timer Fp
M
Minghao Li 已提交
123 124
  uint64_t          pingTimerCounter;

M
Minghao Li 已提交
125
  // elect timer
M
Minghao Li 已提交
126 127
  tmr_h             pElectTimer;
  int32_t           electTimerMS;
M
Minghao Li 已提交
128 129
  uint64_t          electTimerLogicClock;
  uint64_t          electTimerLogicClockUser;
M
Minghao Li 已提交
130
  TAOS_TMR_CALLBACK FpElectTimerCB;  // Timer Fp
M
Minghao Li 已提交
131 132
  uint64_t          electTimerCounter;

M
Minghao Li 已提交
133
  // heartbeat timer
M
Minghao Li 已提交
134 135
  tmr_h             pHeartbeatTimer;
  int32_t           heartbeatTimerMS;
M
Minghao Li 已提交
136 137
  uint64_t          heartbeatTimerLogicClock;
  uint64_t          heartbeatTimerLogicClockUser;
M
Minghao Li 已提交
138
  TAOS_TMR_CALLBACK FpHeartbeatTimerCB;  // Timer Fp
M
Minghao Li 已提交
139 140 141
  uint64_t          heartbeatTimerCounter;

  // callback
M
Minghao Li 已提交
142 143 144 145 146 147 148 149
  FpOnPingCb               FpOnPing;
  FpOnPingReplyCb          FpOnPingReply;
  FpOnClientRequestCb      FpOnClientRequest;
  FpOnTimeoutCb            FpOnTimeout;
  FpOnRequestVoteCb        FpOnRequestVote;
  FpOnRequestVoteReplyCb   FpOnRequestVoteReply;
  FpOnAppendEntriesCb      FpOnAppendEntries;
  FpOnAppendEntriesReplyCb FpOnAppendEntriesReply;
M
Minghao Li 已提交
150 151
  FpOnSnapshotSendCb       FpOnSnapshotSend;
  FpOnSnapshotRspCb        FpOnSnapshotRsp;
M
Minghao Li 已提交
152

M
Minghao Li 已提交
153 154 155
  // tools
  SSyncRespMgr* pSyncRespMgr;

156
  // restore state
157 158
  bool restoreFinish;
  // SSnapshot*             pSnapshot;
M
Minghao Li 已提交
159
  SSyncSnapshotSender*   senders[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
160
  SSyncSnapshotReceiver* pNewNodeReceiver;
161

M
Minghao Li 已提交
162 163
  // is config changing
  bool changing;
164

M
Minghao Li 已提交
165 166
} SSyncNode;

M
Minghao Li 已提交
167
// open/close --------------
M
Minghao Li 已提交
168
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
M
Minghao Li 已提交
169
void       syncNodeStart(SSyncNode* pSyncNode);
M
Minghao Li 已提交
170
void       syncNodeStartStandBy(SSyncNode* pSyncNode);
M
syncInt  
Minghao Li 已提交
171
void       syncNodeClose(SSyncNode* pSyncNode);
172
int32_t    syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
173
int32_t    syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
M
Minghao Li 已提交
174

M
Minghao Li 已提交
175
// option
M
Minghao Li 已提交
176 177 178
bool          syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
SyncIndex     syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
M
Minghao Li 已提交
179

M
Minghao Li 已提交
180
// ping --------------
M
Minghao Li 已提交
181
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
M
Minghao Li 已提交
182
int32_t syncNodePingSelf(SSyncNode* pSyncNode);
M
Minghao Li 已提交
183 184
int32_t syncNodePingPeers(SSyncNode* pSyncNode);
int32_t syncNodePingAll(SSyncNode* pSyncNode);
M
Minghao Li 已提交
185

M
Minghao Li 已提交
186
// timer control --------------
M
Minghao Li 已提交
187 188
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
189
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
M
Minghao Li 已提交
190
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
191
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
M
Minghao Li 已提交
192
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
193 194
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
195
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
196 197

// utils --------------
M
Minghao Li 已提交
198 199 200 201
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
cJSON*  syncNode2Json(const SSyncNode* pSyncNode);
char*   syncNode2Str(const SSyncNode* pSyncNode);
M
Minghao Li 已提交
202
void    syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
M
Minghao Li 已提交
203
void    syncNodeErrorLog(const SSyncNode* pSyncNode, char* str);
M
Minghao Li 已提交
204
char*   syncNode2SimpleStr(const SSyncNode* pSyncNode);
205
bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
M
Minghao Li 已提交
206
void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
M
Minghao Li 已提交
207

M
Minghao Li 已提交
208 209
SSyncNode* syncNodeAcquire(int64_t rid);
void       syncNodeRelease(SSyncNode* pNode);
M
Minghao Li 已提交
210

M
Minghao Li 已提交
211 212
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
213 214
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
M
Minghao Li 已提交
215 216 217 218 219 220 221 222 223 224

void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);

// raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);

M
Minghao Li 已提交
225
// snapshot --------------
M
Minghao Li 已提交
226
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
M
Minghao Li 已提交
227
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
M
Minghao Li 已提交
228

M
Minghao Li 已提交
229 230 231
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
SyncTerm  syncNodeGetLastTerm(SSyncNode* pSyncNode);
int32_t   syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
M
Minghao Li 已提交
232 233 234

SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode);

M
Minghao Li 已提交
235
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
M
Minghao Li 已提交
236
SyncTerm  syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
M
Minghao Li 已提交
237
int32_t   syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
M
Minghao Li 已提交
238

239
bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
240
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
241
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
242

243 244
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);

M
Minghao Li 已提交
245 246
bool                 syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
247

M
Minghao Li 已提交
248 249
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
250

M
Minghao Li 已提交
251 252 253
void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid);

M
Minghao Li 已提交
254 255 256
bool syncNodeCanChange(SSyncNode* pSyncNode);
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);

M
Minghao Li 已提交
257 258
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
259
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
M
Minghao Li 已提交
260

M
Minghao Li 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
// trace log
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);

void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);

void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);

void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);

void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);

M
Minghao Li 已提交
277 278 279 280 281
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
void syncNodeLog(SSyncNode* pObj);
void syncNodeLog2(char* s, SSyncNode* pObj);
M
Minghao Li 已提交
282
void syncNodeLog3(char* s, SSyncNode* pObj);
M
Minghao Li 已提交
283

M
Minghao Li 已提交
284 285 286 287 288
#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_INT_H*/