sync.h 8.0 KB
Newer Older
S
Shengliang Guan 已提交
1
/*
M
Minghao Li 已提交
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
S
Shengliang Guan 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_H
#define _TD_LIBS_SYNC_H

#ifdef __cplusplus
extern "C" {
#endif

M
Minghao Li 已提交
23
#include "cJSON.h"
M
Minghao Li 已提交
24
#include "tdef.h"
S
Shengliang Guan 已提交
25
#include "tmsgcb.h"
S
Shengliang Guan 已提交
26

M
Minghao Li 已提交
27 28
extern bool gRaftDetailLog;

29 30 31 32 33 34 35
#define SYNC_RESP_TTL_MS             10000000
#define SYNC_SPEED_UP_HB_TIMER       400
#define SYNC_SPEED_UP_AFTER_MS       (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE         100
#define SYNC_MAX_READ_RANGE          2
#define SYNC_MAX_PROGRESS_WAIT_MS    4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
36
#define SYNC_MAX_RECV_TIME_RANGE_MS  1200
37
#define SYNC_ADD_QUORUM_COUNT        3
M
Minghao Li 已提交
38

M
Minghao Li 已提交
39 40 41 42
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN    0
#define SYNC_INDEX_INVALID  -1
#define SYNC_TERM_INVALID   0xFFFFFFFFFFFFFFFF
43

M
Minghao Li 已提交
44 45 46 47 48 49
typedef enum {
  SYNC_STRATEGY_NO_SNAPSHOT = 0,
  SYNC_STRATEGY_STANDARD_SNAPSHOT = 1,
  SYNC_STRATEGY_WAL_FIRST = 2,
} ESyncStrategy;

M
Minghao Li 已提交
50
typedef uint64_t SyncNodeId;
S
Shengliang Guan 已提交
51 52 53 54
typedef int32_t  SyncGroupId;
typedef int64_t  SyncIndex;
typedef uint64_t SyncTerm;

55 56 57 58 59
typedef struct SSyncNode      SSyncNode;
typedef struct SSyncBuffer    SSyncBuffer;
typedef struct SWal           SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;

S
Shengliang Guan 已提交
60
typedef enum {
M
Minghao Li 已提交
61 62 63
  TAOS_SYNC_STATE_FOLLOWER = 100,
  TAOS_SYNC_STATE_CANDIDATE = 101,
  TAOS_SYNC_STATE_LEADER = 102,
M
Minghao Li 已提交
64
  TAOS_SYNC_STATE_ERROR = 103,
M
syncInt  
Minghao Li 已提交
65
} ESyncState;
S
Shengliang Guan 已提交
66

M
Minghao Li 已提交
67
typedef struct SNodeInfo {
M
Minghao Li 已提交
68 69
  uint16_t nodePort;
  char     nodeFqdn[TSDB_FQDN_LEN];
S
Shengliang Guan 已提交
70 71
} SNodeInfo;

M
Minghao Li 已提交
72
typedef struct SSyncCfg {
M
Minghao Li 已提交
73
  int32_t   replicaNum;
M
Minghao Li 已提交
74
  int32_t   myIndex;
S
Shengliang Guan 已提交
75
  SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
76
} SSyncCfg;
S
Shengliang Guan 已提交
77

M
Minghao Li 已提交
78
typedef struct SFsmCbMeta {
M
Minghao Li 已提交
79
  int32_t    code;
M
Minghao Li 已提交
80
  SyncIndex  index;
M
Minghao Li 已提交
81 82
  SyncTerm   term;
  uint64_t   seqNum;
83
  SyncIndex  lastConfigIndex;
M
Minghao Li 已提交
84
  ESyncState state;
85
  SyncTerm   currentTerm;
M
Minghao Li 已提交
86
  bool       isWeak;
87
  uint64_t   flag;
M
Minghao Li 已提交
88 89
} SFsmCbMeta;

90
typedef struct SReConfigCbMeta {
M
Minghao Li 已提交
91 92 93 94 95 96 97 98 99 100 101
  int32_t    code;
  SyncIndex  index;
  SyncTerm   term;
  uint64_t   seqNum;
  SyncIndex  lastConfigIndex;
  ESyncState state;
  SyncTerm   currentTerm;
  bool       isWeak;
  uint64_t   flag;

  // config info
M
Minghao Li 已提交
102
  SSyncCfg  oldCfg;
103
  SSyncCfg  newCfg;
M
Minghao Li 已提交
104 105 106 107
  SyncIndex newCfgIndex;
  SyncTerm  newCfgTerm;
  uint64_t  newCfgSeqNum;

108 109
} SReConfigCbMeta;

110 111 112 113 114
typedef struct SSnapshotParam {
  SyncIndex start;
  SyncIndex end;
} SSnapshotParam;

115
typedef struct SSnapshot {
M
Minghao Li 已提交
116
  void*     data;
117 118
  SyncIndex lastApplyIndex;
  SyncTerm  lastApplyTerm;
M
Minghao Li 已提交
119
  SyncIndex lastConfigIndex;
120 121
} SSnapshot;

122 123 124 125
typedef struct SSnapshotMeta {
  SyncIndex lastConfigIndex;
} SSnapshotMeta;

M
Minghao Li 已提交
126 127
typedef struct SSyncFSM {
  void* data;
128

M
Minghao Li 已提交
129 130 131
  void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
  void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
  void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
132

133
  void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
134
  void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
M
Minghao Li 已提交
135
  void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
M
Minghao Li 已提交
136

137 138 139
  void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm);
  void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm);

M
Minghao Li 已提交
140
  int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
141
  int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
142

143
  int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
144 145
  int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
  int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
146

147
  int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
148
  int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
149
  int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
150

S
Shengliang Guan 已提交
151 152
} SSyncFSM;

M
Minghao Li 已提交
153 154
// abstract definition of log store in raft
// SWal implements it
S
Shengliang Guan 已提交
155
typedef struct SSyncLogStore {
M
Minghao Li 已提交
156 157 158
  void* data;

  // append one log entry
M
Minghao Li 已提交
159
  int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
S
Shengliang Guan 已提交
160

M
Minghao Li 已提交
161
  // get one log entry, user need to free pEntry->pCont
M
Minghao Li 已提交
162
  SSyncRaftEntry* (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index);
S
Shengliang Guan 已提交
163

M
Minghao Li 已提交
164 165
  // truncate log with index, entries after the given index (>=index) will be deleted
  int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
S
Shengliang Guan 已提交
166

M
Minghao Li 已提交
167 168 169 170 171
  // return index of last entry
  SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore);

  // return term of last entry
  SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore);
S
Shengliang Guan 已提交
172

M
Minghao Li 已提交
173 174 175 176 177 178
  // update log store commit index with "index"
  int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);

  // return commit index of log
  SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);

M
Minghao Li 已提交
179 180 181 182
  SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore);
  SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
  bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
  int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
183
  int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index);
184
  bool (*syncLogExist)(struct SSyncLogStore* pLogStore, SyncIndex index);
M
Minghao Li 已提交
185

M
Minghao Li 已提交
186
  SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
187 188 189 190 191 192 193
  SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
  SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);

  int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
  int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
  int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);

S
Shengliang Guan 已提交
194 195
} SSyncLogStore;

M
Minghao Li 已提交
196
typedef struct SSyncInfo {
M
Minghao Li 已提交
197 198 199 200 201 202 203 204 205
  bool          isStandBy;
  ESyncStrategy snapshotStrategy;
  SyncGroupId   vgId;
  int32_t       batchSize;
  SSyncCfg      syncCfg;
  char          path[TSDB_FILENAME_LEN];
  SWal*         pWal;
  SSyncFSM*     pFsm;
  SMsgCb*       msgcb;
S
Shengliang Guan 已提交
206 207
  int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
  int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
S
Shengliang Guan 已提交
208 209
} SSyncInfo;

M
Minghao Li 已提交
210 211 212 213 214
int32_t     syncInit();
void        syncCleanUp();
int64_t     syncOpen(const SSyncInfo* pSyncInfo);
void        syncStart(int64_t rid);
void        syncStop(int64_t rid);
M
Minghao Li 已提交
215
int32_t     syncSetStandby(int64_t rid);
M
Minghao Li 已提交
216
ESyncState  syncGetMyRole(int64_t rid);
M
Minghao Li 已提交
217
bool        syncIsReady(int64_t rid);
218
bool        syncIsReadyForRead(int64_t rid);
M
Minghao Li 已提交
219
const char* syncGetMyRoleStr(int64_t rid);
M
Minghao Li 已提交
220
bool        syncRestoreFinish(int64_t rid);
M
Minghao Li 已提交
221
SyncTerm    syncGetMyTerm(int64_t rid);
222 223
SyncIndex   syncGetLastIndex(int64_t rid);
SyncIndex   syncGetCommitIndex(int64_t rid);
M
Minghao Li 已提交
224
SyncGroupId syncGetVgId(int64_t rid);
M
Minghao Li 已提交
225
void        syncGetEpSet(int64_t rid, SEpSet* pEpSet);
226
void        syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
M
Minghao Li 已提交
227
int32_t     syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
228
int32_t     syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
229
bool        syncEnvIsStart();
M
Minghao Li 已提交
230
const char* syncStr(ESyncState state);
M
Minghao Li 已提交
231
bool        syncIsRestoreFinish(int64_t rid);
232
int32_t     syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
233

234
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
M
Minghao Li 已提交
235 236 237

// build SRpcMsg, need to call syncPropose with SRpcMsg
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
238

239 240 241
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);

S
Shengliang Guan 已提交
242 243 244 245 246
#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_H*/