sync.h 6.8 KB
Newer Older
S
Shengliang Guan 已提交
1
/*
M
Minghao Li 已提交
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
S
Shengliang Guan 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_H
#define _TD_LIBS_SYNC_H

#ifdef __cplusplus
extern "C" {
#endif

M
Minghao Li 已提交
23
#include "cJSON.h"
M
Minghao Li 已提交
24
#include "tdef.h"
S
Shengliang Guan 已提交
25
#include "tmsgcb.h"
S
Shengliang Guan 已提交
26

27 28 29
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1

M
Minghao Li 已提交
30
typedef uint64_t SyncNodeId;
S
Shengliang Guan 已提交
31 32 33 34
typedef int32_t  SyncGroupId;
typedef int64_t  SyncIndex;
typedef uint64_t SyncTerm;

35 36 37 38 39
typedef struct SSyncNode      SSyncNode;
typedef struct SSyncBuffer    SSyncBuffer;
typedef struct SWal           SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;

S
Shengliang Guan 已提交
40
typedef enum {
M
Minghao Li 已提交
41 42 43
  TAOS_SYNC_STATE_FOLLOWER = 100,
  TAOS_SYNC_STATE_CANDIDATE = 101,
  TAOS_SYNC_STATE_LEADER = 102,
M
Minghao Li 已提交
44
  TAOS_SYNC_STATE_ERROR = 103,
M
syncInt  
Minghao Li 已提交
45
} ESyncState;
S
Shengliang Guan 已提交
46

47 48 49
typedef enum {
  TAOS_SYNC_PROPOSE_SUCCESS = 0,
  TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
M
Minghao Li 已提交
50 51 52
  TAOS_SYNC_ONLY_ONE_REPLICA = 2,
  TAOS_SYNC_NOT_IN_NEW_CONFIG = 3,
  TAOS_SYNC_OTHER_ERROR = 100,
53 54 55 56 57 58 59
} ESyncProposeCode;

typedef enum {
  TAOS_SYNC_FSM_CB_SUCCESS = 0,
  TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
} ESyncFsmCbCode;

M
Minghao Li 已提交
60
typedef struct SNodeInfo {
M
Minghao Li 已提交
61 62
  uint16_t nodePort;
  char     nodeFqdn[TSDB_FQDN_LEN];
S
Shengliang Guan 已提交
63 64
} SNodeInfo;

M
Minghao Li 已提交
65
typedef struct SSyncCfg {
M
Minghao Li 已提交
66
  int32_t   replicaNum;
M
Minghao Li 已提交
67
  int32_t   myIndex;
S
Shengliang Guan 已提交
68
  SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
69
} SSyncCfg;
S
Shengliang Guan 已提交
70

M
Minghao Li 已提交
71 72 73 74 75 76
typedef struct SFsmCbMeta {
  SyncIndex  index;
  bool       isWeak;
  int32_t    code;
  ESyncState state;
  uint64_t   seqNum;
77 78
  SyncTerm   term;
  SyncTerm   currentTerm;
79
  uint64_t   flag;
M
Minghao Li 已提交
80 81
} SFsmCbMeta;

82 83 84 85 86
typedef struct SReConfigCbMeta {
  int32_t   code;
  SyncIndex index;
  SyncTerm  term;
  SyncTerm  currentTerm;
M
Minghao Li 已提交
87
  SSyncCfg  oldCfg;
88
  SSyncCfg  newCfg;
M
Minghao Li 已提交
89 90
  bool      isDrop;
  uint64_t  flag;
91
  uint64_t  seqNum;
92 93
} SReConfigCbMeta;

94
typedef struct SSnapshot {
M
Minghao Li 已提交
95
  void*     data;
96 97
  SyncIndex lastApplyIndex;
  SyncTerm  lastApplyTerm;
M
Minghao Li 已提交
98
  SyncIndex lastConfigIndex;
99 100
} SSnapshot;

101 102 103 104
typedef struct SSnapshotMeta {
  SyncIndex lastConfigIndex;
} SSnapshotMeta;

M
Minghao Li 已提交
105 106
typedef struct SSyncFSM {
  void* data;
107

M
Minghao Li 已提交
108 109 110
  void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
  void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
  void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
111

112
  void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
113
  void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
M
Minghao Li 已提交
114
  void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
M
Minghao Li 已提交
115

116
  int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
117

118 119 120
  int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
  int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
  int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
121

122 123 124
  int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void** ppWriter);
  int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply);
  int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
125

S
Shengliang Guan 已提交
126 127
} SSyncFSM;

M
Minghao Li 已提交
128 129
// abstract definition of log store in raft
// SWal implements it
S
Shengliang Guan 已提交
130
typedef struct SSyncLogStore {
M
Minghao Li 已提交
131 132 133
  void* data;

  // append one log entry
M
Minghao Li 已提交
134
  int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
S
Shengliang Guan 已提交
135

M
Minghao Li 已提交
136
  // get one log entry, user need to free pEntry->pCont
M
Minghao Li 已提交
137
  SSyncRaftEntry* (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index);
S
Shengliang Guan 已提交
138

M
Minghao Li 已提交
139 140
  // truncate log with index, entries after the given index (>=index) will be deleted
  int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
S
Shengliang Guan 已提交
141

M
Minghao Li 已提交
142 143 144 145 146
  // return index of last entry
  SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore);

  // return term of last entry
  SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore);
S
Shengliang Guan 已提交
147

M
Minghao Li 已提交
148 149 150 151 152 153
  // update log store commit index with "index"
  int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);

  // return commit index of log
  SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);

M
Minghao Li 已提交
154 155
  // refactor, log[0 .. n] ==> log[m .. n]
  int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
M
Minghao Li 已提交
156
  int32_t (*syncLogResetBeginIndex)(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
157 158 159 160 161 162
  SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore);
  SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
  bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
  int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
  bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index);

M
Minghao Li 已提交
163
  SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
164 165 166 167 168 169 170
  SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
  SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);

  int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
  int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
  int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);

S
Shengliang Guan 已提交
171 172
} SSyncLogStore;

M
Minghao Li 已提交
173
typedef struct SSyncInfo {
M
Minghao Li 已提交
174
  bool        isStandBy;
M
Minghao Li 已提交
175
  bool        snapshotEnable;
M
Minghao Li 已提交
176 177
  SyncGroupId vgId;
  SSyncCfg    syncCfg;
M
Minghao Li 已提交
178
  char        path[TSDB_FILENAME_LEN];
M
Minghao Li 已提交
179
  SWal*       pWal;
M
Minghao Li 已提交
180
  SSyncFSM*   pFsm;
S
Shengliang Guan 已提交
181 182 183
  SMsgCb*     msgcb;
  int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
  int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
S
Shengliang Guan 已提交
184 185
} SSyncInfo;

M
Minghao Li 已提交
186 187 188 189 190
int32_t     syncInit();
void        syncCleanUp();
int64_t     syncOpen(const SSyncInfo* pSyncInfo);
void        syncStart(int64_t rid);
void        syncStop(int64_t rid);
M
Minghao Li 已提交
191
int32_t     syncSetStandby(int64_t rid);
M
Minghao Li 已提交
192 193 194
ESyncState  syncGetMyRole(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid);
SyncTerm    syncGetMyTerm(int64_t rid);
M
Minghao Li 已提交
195
void        syncGetEpSet(int64_t rid, SEpSet* pEpSet);
M
Minghao Li 已提交
196
int32_t     syncGetVgId(int64_t rid);
197 198
int32_t     syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool        syncEnvIsStart();
M
Minghao Li 已提交
199
const char* syncStr(ESyncState state);
M
Minghao Li 已提交
200
bool        syncIsRestoreFinish(int64_t rid);
201
int32_t     syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
S
Shengliang Guan 已提交
202

203
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
M
Minghao Li 已提交
204 205 206

// build SRpcMsg, need to call syncPropose with SRpcMsg
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
207

208 209 210
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);

S
Shengliang Guan 已提交
211 212 213 214 215
#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_H*/