sync.h 8.7 KB
Newer Older
S
Shengliang Guan 已提交
1
/*
M
Minghao Li 已提交
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
S
Shengliang Guan 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_H
#define _TD_LIBS_SYNC_H

#ifdef __cplusplus
extern "C" {
#endif

M
Minghao Li 已提交
23
#include "cJSON.h"
M
Minghao Li 已提交
24
#include "tdef.h"
25
#include "tlrucache.h"
S
Shengliang Guan 已提交
26
#include "tmsgcb.h"
S
Shengliang Guan 已提交
27

28
#define SYNC_RESP_TTL_MS             30000
29 30 31 32 33 34
#define SYNC_SPEED_UP_HB_TIMER       400
#define SYNC_SPEED_UP_AFTER_MS       (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE         100
#define SYNC_MAX_READ_RANGE          2
#define SYNC_MAX_PROGRESS_WAIT_MS    4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
35
#define SYNC_MAX_RECV_TIME_RANGE_MS  1200
M
Minghao Li 已提交
36
#define SYNC_DEL_WAL_MS              (1000 * 60)
37
#define SYNC_ADD_QUORUM_COUNT        3
38
#define SYNC_VNODE_LOG_RETENTION     (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
M
Minghao Li 已提交
39
#define SNAPSHOT_MAX_CLOCK_SKEW_MS   1000 * 10
M
Minghao Li 已提交
40
#define SNAPSHOT_WAIT_MS             1000 * 30
M
Minghao Li 已提交
41

B
Benguang Zhao 已提交
42
#define SYNC_MAX_RETRY_BACKOFF         5
43
#define SYNC_LOG_REPL_RETRY_WAIT_MS    100
M
Minghao Li 已提交
44
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
45
#define SYNC_HEART_TIMEOUT_MS          1000 * 15
M
Minghao Li 已提交
46

47 48
#define SYNC_HEARTBEAT_SLOW_MS       1500
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
49
#define SYNC_SNAP_RESEND_MS          1000 * 60
50

B
Benguang Zhao 已提交
51
#define SYNC_VND_COMMIT_MIN_MS 3000
B
Benguang Zhao 已提交
52

M
Minghao Li 已提交
53 54 55
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN    0
#define SYNC_INDEX_INVALID  -1
B
Benguang Zhao 已提交
56
#define SYNC_TERM_INVALID   -1
57

C
cadem 已提交
58 59
#define SYNC_LEARNER_CATCHUP 10

M
Minghao Li 已提交
60 61 62 63 64 65
typedef enum {
  SYNC_STRATEGY_NO_SNAPSHOT = 0,
  SYNC_STRATEGY_STANDARD_SNAPSHOT = 1,
  SYNC_STRATEGY_WAL_FIRST = 2,
} ESyncStrategy;

M
Minghao Li 已提交
66
typedef uint64_t SyncNodeId;
S
Shengliang Guan 已提交
67 68
typedef int32_t  SyncGroupId;
typedef int64_t  SyncIndex;
B
Benguang Zhao 已提交
69
typedef int64_t  SyncTerm;
S
Shengliang Guan 已提交
70

71 72 73 74
typedef struct SSyncNode      SSyncNode;
typedef struct SWal           SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;

S
Shengliang Guan 已提交
75
typedef enum {
S
Shengliang Guan 已提交
76
  TAOS_SYNC_STATE_OFFLINE = 0,
M
Minghao Li 已提交
77 78 79
  TAOS_SYNC_STATE_FOLLOWER = 100,
  TAOS_SYNC_STATE_CANDIDATE = 101,
  TAOS_SYNC_STATE_LEADER = 102,
M
Minghao Li 已提交
80
  TAOS_SYNC_STATE_ERROR = 103,
C
cadem 已提交
81
  TAOS_SYNC_STATE_LEARNER = 104,
M
syncInt  
Minghao Li 已提交
82
} ESyncState;
S
Shengliang Guan 已提交
83

C
cadem 已提交
84 85 86 87 88 89
typedef enum {
  TAOS_SYNC_ROLE_VOTER = 0,
  TAOS_SYNC_ROLE_LEARNER = 1,
  TAOS_SYNC_ROLE_ERROR = 2,
} ESyncRole;

M
Minghao Li 已提交
90
typedef struct SNodeInfo {
C
cadem 已提交
91 92 93 94 95
  int64_t   clusterId;
  int32_t   nodeId;
  uint16_t  nodePort;
  char      nodeFqdn[TSDB_FQDN_LEN];
  ESyncRole nodeRole;
S
Shengliang Guan 已提交
96 97
} SNodeInfo;

M
Minghao Li 已提交
98
typedef struct SSyncCfg {
C
cadem 已提交
99
  int32_t   totalReplicaNum;
M
Minghao Li 已提交
100
  int32_t   replicaNum;
M
Minghao Li 已提交
101
  int32_t   myIndex;
C
cadem 已提交
102 103
  SNodeInfo nodeInfo[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
  SyncIndex lastIndex;
M
Minghao Li 已提交
104
} SSyncCfg;
S
Shengliang Guan 已提交
105

M
Minghao Li 已提交
106
typedef struct SFsmCbMeta {
M
Minghao Li 已提交
107
  int32_t    code;
M
Minghao Li 已提交
108
  SyncIndex  index;
M
Minghao Li 已提交
109 110
  SyncTerm   term;
  uint64_t   seqNum;
111
  SyncIndex  lastConfigIndex;
M
Minghao Li 已提交
112
  ESyncState state;
113
  SyncTerm   currentTerm;
M
Minghao Li 已提交
114
  bool       isWeak;
115
  uint64_t   flag;
M
Minghao Li 已提交
116 117
} SFsmCbMeta;

118
typedef struct SReConfigCbMeta {
M
Minghao Li 已提交
119 120 121 122 123 124 125 126 127 128 129
  int32_t    code;
  SyncIndex  index;
  SyncTerm   term;
  uint64_t   seqNum;
  SyncIndex  lastConfigIndex;
  ESyncState state;
  SyncTerm   currentTerm;
  bool       isWeak;
  uint64_t   flag;

  // config info
M
Minghao Li 已提交
130
  SSyncCfg  oldCfg;
131
  SSyncCfg  newCfg;
M
Minghao Li 已提交
132 133 134 135
  SyncIndex newCfgIndex;
  SyncTerm  newCfgTerm;
  uint64_t  newCfgSeqNum;

136 137
} SReConfigCbMeta;

138 139 140 141 142
typedef struct SSnapshotParam {
  SyncIndex start;
  SyncIndex end;
} SSnapshotParam;

143
typedef struct SSnapshot {
M
Minghao Li 已提交
144
  void*     data;
145 146
  SyncIndex lastApplyIndex;
  SyncTerm  lastApplyTerm;
M
Minghao Li 已提交
147
  SyncIndex lastConfigIndex;
148 149
} SSnapshot;

150 151 152 153
typedef struct SSnapshotMeta {
  SyncIndex lastConfigIndex;
} SSnapshotMeta;

M
Minghao Li 已提交
154 155
typedef struct SSyncFSM {
  void* data;
156

157
  int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
D
dapan1121 已提交
158
  SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm);
159
  int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
160
  void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
161

D
dapan1121 已提交
162
  void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
163 164
  void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
  void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
165
  bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
166
  int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm);
M
Minghao Li 已提交
167

S
Shengliang Guan 已提交
168 169
  void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm);
  void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
C
cadem 已提交
170
  void (*FpBecomeLearnerCb)(const struct SSyncFSM* pFsm);
171

S
Shengliang Guan 已提交
172
  int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
173
  void (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
174

S
Shengliang Guan 已提交
175
  int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
176
  void (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader);
S
Shengliang Guan 已提交
177
  int32_t (*FpSnapshotDoRead)(const struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
178

S
Shengliang Guan 已提交
179 180 181
  int32_t (*FpSnapshotStartWrite)(const struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
  int32_t (*FpSnapshotStopWrite)(const struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
  int32_t (*FpSnapshotDoWrite)(const struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
182

S
Shengliang Guan 已提交
183 184
} SSyncFSM;

M
Minghao Li 已提交
185 186
// abstract definition of log store in raft
// SWal implements it
S
Shengliang Guan 已提交
187
typedef struct SSyncLogStore {
188
  SLRUCache* pCache;
189 190 191 192
  int32_t    cacheHit;
  int32_t    cacheMiss;

  void* data;
M
Minghao Li 已提交
193

M
Minghao Li 已提交
194 195
  int32_t (*syncLogUpdateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
  SyncIndex (*syncLogCommitIndex)(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
196

M
Minghao Li 已提交
197 198
  SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore);
  SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
199

M
Minghao Li 已提交
200
  int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
201
  int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index);
M
Minghao Li 已提交
202
  bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
203
  bool (*syncLogExist)(struct SSyncLogStore* pLogStore, SyncIndex index);
M
Minghao Li 已提交
204

M
Minghao Li 已提交
205
  SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
206 207 208
  SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
  SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);

209
  int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);
M
Minghao Li 已提交
210 211 212
  int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
  int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);

S
Shengliang Guan 已提交
213 214
} SSyncLogStore;

M
Minghao Li 已提交
215
typedef struct SSyncInfo {
M
Minghao Li 已提交
216 217 218 219 220 221 222 223 224
  bool          isStandBy;
  ESyncStrategy snapshotStrategy;
  SyncGroupId   vgId;
  int32_t       batchSize;
  SSyncCfg      syncCfg;
  char          path[TSDB_FILENAME_LEN];
  SWal*         pWal;
  SSyncFSM*     pFsm;
  SMsgCb*       msgcb;
S
Shengliang Guan 已提交
225 226 227
  int32_t       pingMs;
  int32_t       electMs;
  int32_t       heartbeatMs;
228

S
Shengliang Guan 已提交
229 230 231
  int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
  int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
  int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
S
Shengliang Guan 已提交
232
} SSyncInfo;
233

234 235 236 237
// if state == leader
//     if restored, display "leader"
//     if !restored && canRead, display "leader*"
//     if !restored && !canRead, display "leader**"
238 239 240
typedef struct SSyncState {
  ESyncState state;
  bool       restored;
241
  bool       canRead;
242 243 244 245 246
} SSyncState;

int32_t syncInit();
void    syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo);
247
int32_t syncStart(int64_t rid);
248
void    syncStop(int64_t rid);
M
Minghao Li 已提交
249
void    syncPreStop(int64_t rid);
250
void    syncPostStop(int64_t rid);
S
Shengliang Guan 已提交
251
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
C
cadem 已提交
252
int32_t syncIsCatchUp(int64_t rid);
253
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
S
Shengliang Guan 已提交
254
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
255 256
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid);
257
int32_t syncLeaderTransfer(int64_t rid);
M
Minghao Li 已提交
258
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
259
bool    syncIsReadyForRead(int64_t rid);
260 261
bool    syncSnapshotSending(int64_t rid);
bool    syncSnapshotRecving(int64_t rid);
262
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
C
cadem 已提交
263
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
M
Minghao Li 已提交
264

265 266 267
SSyncState  syncGetState(int64_t rid);
void        syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
const char* syncStr(ESyncState state);
268

S
Shengliang Guan 已提交
269 270 271 272 273
#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_H*/