syncInt.h 8.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_INT_H
#define _TD_LIBS_SYNC_INT_H

#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
M
Minghao Li 已提交
26
#include "cJSON.h"
M
Minghao Li 已提交
27
#include "sync.h"
M
Minghao Li 已提交
28
#include "taosdef.h"
M
syncInt  
Minghao Li 已提交
29
#include "tglobal.h"
M
Minghao Li 已提交
30
#include "tlog.h"
M
Minghao Li 已提交
31
#include "ttimer.h"
M
Minghao Li 已提交
32

33 34 35 36 37
#define sFatal(...)                                              \
  {                                                              \
    if (sDebugFlag & DEBUG_FATAL) {                              \
      taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
    }                                                            \
M
Minghao Li 已提交
38
  }
39 40 41 42 43
#define sError(...)                                              \
  {                                                              \
    if (sDebugFlag & DEBUG_ERROR) {                              \
      taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
    }                                                            \
M
Minghao Li 已提交
44
  }
45 46 47 48 49
#define sWarn(...)                                             \
  {                                                            \
    if (sDebugFlag & DEBUG_WARN) {                             \
      taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
    }                                                          \
M
Minghao Li 已提交
50
  }
51 52 53 54 55
#define sInfo(...)                                             \
  {                                                            \
    if (sDebugFlag & DEBUG_INFO) {                             \
      taosPrintLog("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
    }                                                          \
M
Minghao Li 已提交
56
  }
57 58 59 60 61
#define sDebug(...)                                                     \
  {                                                                     \
    if (sDebugFlag & DEBUG_DEBUG) {                                     \
      taosPrintLog("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
    }                                                                   \
M
Minghao Li 已提交
62
  }
63 64 65 66 67
#define sTrace(...)                                                     \
  {                                                                     \
    if (sDebugFlag & DEBUG_TRACE) {                                     \
      taosPrintLog("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
    }                                                                   \
M
Minghao Li 已提交
68 69
  }

M
Minghao Li 已提交
70 71 72
struct SyncTimeout;
typedef struct SyncTimeout SyncTimeout;

M
Minghao Li 已提交
73 74 75
struct SyncClientRequest;
typedef struct SyncClientRequest SyncClientRequest;

M
Minghao Li 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
struct SyncPing;
typedef struct SyncPing SyncPing;

struct SyncPingReply;
typedef struct SyncPingReply SyncPingReply;

struct SyncRequestVote;
typedef struct SyncRequestVote SyncRequestVote;

struct SyncRequestVoteReply;
typedef struct SyncRequestVoteReply SyncRequestVoteReply;

struct SyncAppendEntries;
typedef struct SyncAppendEntries SyncAppendEntries;

struct SyncAppendEntriesReply;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
M
Minghao Li 已提交
93

M
Minghao Li 已提交
94 95 96
struct SSyncEnv;
typedef struct SSyncEnv SSyncEnv;

M
syncInt  
Minghao Li 已提交
97 98 99 100 101 102
struct SRaftStore;
typedef struct SRaftStore SRaftStore;

struct SVotesGranted;
typedef struct SVotesGranted SVotesGranted;

M
Minghao Li 已提交
103 104
struct SVotesRespond;
typedef struct SVotesRespond SVotesRespond;
M
syncInt  
Minghao Li 已提交
105

M
Minghao Li 已提交
106 107 108
struct SSyncIndexMgr;
typedef struct SSyncIndexMgr SSyncIndexMgr;

M
Minghao Li 已提交
109
typedef struct SRaftId {
M
Minghao Li 已提交
110 111
  SyncNodeId  addr;  // typedef uint64_t SyncNodeId;
  SyncGroupId vgId;  // typedef int32_t  SyncGroupId;
M
Minghao Li 已提交
112 113
} SRaftId;

M
Minghao Li 已提交
114
typedef struct SSyncNode {
M
syncInt  
Minghao Li 已提交
115
  // init by SSyncInfo
M
Minghao Li 已提交
116 117 118
  SyncGroupId vgId;
  SSyncCfg    syncCfg;
  char        path[TSDB_FILENAME_LEN];
M
Minghao Li 已提交
119
  char        raftStorePath[TSDB_FILENAME_LEN * 2];
M
Minghao Li 已提交
120 121 122 123

  // sync io
  SWal* pWal;
  void* rpcClient;
M
Minghao Li 已提交
124
  int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
M
Minghao Li 已提交
125 126
  void* queue;
  int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
M
Minghao Li 已提交
127

M
syncInt  
Minghao Li 已提交
128
  // init internal
M
Minghao Li 已提交
129 130
  SNodeInfo myNodeInfo;
  SRaftId   myRaftId;
M
Minghao Li 已提交
131

M
Minghao Li 已提交
132
  int32_t   peersNum;
M
Minghao Li 已提交
133
  SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
134 135 136 137
  SRaftId   peersId[TSDB_MAX_REPLICA];

  int32_t replicaNum;
  SRaftId replicasId[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
138

M
syncInt  
Minghao Li 已提交
139 140 141
  // raft algorithm
  SSyncFSM* pFsm;
  int32_t   quorum;
M
Minghao Li 已提交
142
  SRaftId   leaderCache;
M
syncInt  
Minghao Li 已提交
143 144 145 146 147 148 149 150 151

  // life cycle
  int64_t rid;

  // tla+ server vars
  ESyncState  state;
  SRaftStore* pRaftStore;

  // tla+ candidate vars
M
Minghao Li 已提交
152 153
  SVotesGranted* pVotesGranted;
  SVotesRespond* pVotesRespond;
M
Minghao Li 已提交
154

M
syncInt  
Minghao Li 已提交
155
  // tla+ leader vars
M
Minghao Li 已提交
156 157
  SSyncIndexMgr* pNextIndex;
  SSyncIndexMgr* pMatchIndex;
M
syncInt  
Minghao Li 已提交
158 159 160 161 162

  // tla+ log vars
  SSyncLogStore* pLogStore;
  SyncIndex      commitIndex;

M
Minghao Li 已提交
163 164 165 166 167
  // timer ms init
  int32_t pingBaseLine;
  int32_t electBaseLine;
  int32_t hbBaseLine;

M
Minghao Li 已提交
168
  // ping timer
M
Minghao Li 已提交
169 170
  tmr_h             pPingTimer;
  int32_t           pingTimerMS;
M
Minghao Li 已提交
171 172
  uint64_t          pingTimerLogicClock;
  uint64_t          pingTimerLogicClockUser;
M
Minghao Li 已提交
173
  TAOS_TMR_CALLBACK FpPingTimerCB;  // Timer Fp
M
Minghao Li 已提交
174 175
  uint64_t          pingTimerCounter;

M
Minghao Li 已提交
176
  // elect timer
M
Minghao Li 已提交
177 178
  tmr_h             pElectTimer;
  int32_t           electTimerMS;
M
Minghao Li 已提交
179 180
  uint64_t          electTimerLogicClock;
  uint64_t          electTimerLogicClockUser;
M
Minghao Li 已提交
181
  TAOS_TMR_CALLBACK FpElectTimerCB;  // Timer Fp
M
Minghao Li 已提交
182 183
  uint64_t          electTimerCounter;

M
Minghao Li 已提交
184
  // heartbeat timer
M
Minghao Li 已提交
185 186
  tmr_h             pHeartbeatTimer;
  int32_t           heartbeatTimerMS;
M
Minghao Li 已提交
187 188
  uint64_t          heartbeatTimerLogicClock;
  uint64_t          heartbeatTimerLogicClockUser;
M
Minghao Li 已提交
189
  TAOS_TMR_CALLBACK FpHeartbeatTimerCB;  // Timer Fp
M
Minghao Li 已提交
190 191 192
  uint64_t          heartbeatTimerCounter;

  // callback
M
Minghao Li 已提交
193 194
  int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg);
  int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg);
M
Minghao Li 已提交
195
  int32_t (*FpOnClientRequest)(SSyncNode* ths, SyncClientRequest* pMsg);
M
Minghao Li 已提交
196 197 198 199
  int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg);
  int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
  int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
  int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
M
Minghao Li 已提交
200
  int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
M
Minghao Li 已提交
201

M
Minghao Li 已提交
202 203
} SSyncNode;

M
Minghao Li 已提交
204
// open/close --------------
M
Minghao Li 已提交
205
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
M
syncInt  
Minghao Li 已提交
206
void       syncNodeClose(SSyncNode* pSyncNode);
M
Minghao Li 已提交
207

M
Minghao Li 已提交
208
// ping --------------
M
Minghao Li 已提交
209
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
M
Minghao Li 已提交
210
int32_t syncNodePingSelf(SSyncNode* pSyncNode);
M
Minghao Li 已提交
211 212
int32_t syncNodePingPeers(SSyncNode* pSyncNode);
int32_t syncNodePingAll(SSyncNode* pSyncNode);
M
Minghao Li 已提交
213

M
Minghao Li 已提交
214
// timer control --------------
M
Minghao Li 已提交
215 216
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
217
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
M
Minghao Li 已提交
218
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
219
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
M
Minghao Li 已提交
220
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
221 222
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
M
Minghao Li 已提交
223 224

// utils --------------
M
Minghao Li 已提交
225 226 227 228 229 230
int32_t    syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t    syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
cJSON*     syncNode2Json(const SSyncNode* pSyncNode);
char*      syncNode2Str(const SSyncNode* pSyncNode);
SSyncNode* syncNodeAcquire(int64_t rid);
void       syncNodeRelease(SSyncNode* pNode);
M
Minghao Li 已提交
231

M
Minghao Li 已提交
232 233 234 235 236 237 238 239 240 241 242 243 244 245
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeBecomeFollower(SSyncNode* pSyncNode);
void syncNodeBecomeLeader(SSyncNode* pSyncNode);

void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);

// raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);

M
Minghao Li 已提交
246 247 248 249 250
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
void syncNodeLog(SSyncNode* pObj);
void syncNodeLog2(char* s, SSyncNode* pObj);
M
Minghao Li 已提交
251

M
Minghao Li 已提交
252 253 254 255 256
#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_INT_H*/