sync.h 5.6 KB
Newer Older
S
Shengliang Guan 已提交
1
/*
M
Minghao Li 已提交
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
S
Shengliang Guan 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_H
#define _TD_LIBS_SYNC_H

#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>
H
Haojun Liao 已提交
24
#include <tdatablock.h>
S
Shengliang Guan 已提交
25
#include "taosdef.h"
M
Minghao Li 已提交
26
#include "trpc.h"
M
Minghao Li 已提交
27
#include "wal.h"
S
Shengliang Guan 已提交
28

M
Minghao Li 已提交
29
typedef uint64_t SyncNodeId;
S
Shengliang Guan 已提交
30 31 32 33 34
typedef int32_t  SyncGroupId;
typedef int64_t  SyncIndex;
typedef uint64_t SyncTerm;

typedef enum {
M
Minghao Li 已提交
35 36 37
  TAOS_SYNC_STATE_FOLLOWER = 100,
  TAOS_SYNC_STATE_CANDIDATE = 101,
  TAOS_SYNC_STATE_LEADER = 102,
M
Minghao Li 已提交
38
  TAOS_SYNC_STATE_ERROR = 103,
M
syncInt  
Minghao Li 已提交
39
} ESyncState;
S
Shengliang Guan 已提交
40

M
Minghao Li 已提交
41
typedef struct SSyncBuffer {
S
Shengliang Guan 已提交
42 43 44 45
  void*  data;
  size_t len;
} SSyncBuffer;

M
Minghao Li 已提交
46 47 48
typedef struct SNodeInfo {
  uint16_t nodePort;                 // node sync Port
  char     nodeFqdn[TSDB_FQDN_LEN];  // node FQDN
S
Shengliang Guan 已提交
49 50
} SNodeInfo;

M
Minghao Li 已提交
51
typedef struct SSyncCfg {
M
Minghao Li 已提交
52
  int32_t   replicaNum;
M
Minghao Li 已提交
53
  int32_t   myIndex;
S
Shengliang Guan 已提交
54
  SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
M
Minghao Li 已提交
55
} SSyncCfg;
S
Shengliang Guan 已提交
56

M
Minghao Li 已提交
57
typedef struct SNodesRole {
M
Minghao Li 已提交
58 59
  int32_t    replicaNum;
  SNodeInfo  nodeInfo[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
60 61 62
  ESyncState role[TSDB_MAX_REPLICA];
} SNodesRole;

M
Minghao Li 已提交
63 64 65 66 67
// abstract definition of snapshot
typedef struct SSnapshot {
  void*     data;
  SyncIndex lastApplyIndex;
} SSnapshot;
S
Shengliang Guan 已提交
68

M
Minghao Li 已提交
69 70
typedef struct SSyncFSM {
  void* data;
S
Shengliang Guan 已提交
71

M
Minghao Li 已提交
72
  // when value in pMsg finish a raft flow, FpCommitCb is called, code indicates the result
M
Minghao Li 已提交
73
  // user can do something according to the code and isWeak. for example, write data into tsdb
M
Minghao Li 已提交
74 75
  void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
                     ESyncState state);
S
Shengliang Guan 已提交
76

M
Minghao Li 已提交
77
  // when value in pMsg has been written into local log store, FpPreCommitCb is called, code indicates the result
M
Minghao Li 已提交
78
  // user can do something according to the code and isWeak. for example, write data into tsdb
M
Minghao Li 已提交
79 80
  void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
                        ESyncState state);
S
Shengliang Guan 已提交
81

M
Minghao Li 已提交
82 83
  // when log entry is updated by a new one, FpRollBackCb is called
  // user can do something to roll back. for example, delete data from tsdb, or just ignore it
M
Minghao Li 已提交
84 85
  void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
                       ESyncState state);
S
Shengliang Guan 已提交
86

M
Minghao Li 已提交
87 88
  // user should implement this function, use "data" to take snapshot into "snapshot"
  int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
S
Shengliang Guan 已提交
89

M
Minghao Li 已提交
90 91
  // user should implement this function, restore "data" from "snapshot"
  int32_t (*FpRestoreSnapshot)(const SSnapshot* snapshot);
S
Shengliang Guan 已提交
92 93 94

} SSyncFSM;

M
Minghao Li 已提交
95 96 97
struct SSyncRaftEntry;
typedef struct SSyncRaftEntry SSyncRaftEntry;

M
Minghao Li 已提交
98 99
// abstract definition of log store in raft
// SWal implements it
S
Shengliang Guan 已提交
100
typedef struct SSyncLogStore {
M
Minghao Li 已提交
101 102 103
  void* data;

  // append one log entry
M
Minghao Li 已提交
104
  int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
S
Shengliang Guan 已提交
105

M
Minghao Li 已提交
106
  // get one log entry, user need to free pEntry->pCont
M
Minghao Li 已提交
107
  SSyncRaftEntry* (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index);
S
Shengliang Guan 已提交
108

M
Minghao Li 已提交
109 110
  // truncate log with index, entries after the given index (>=index) will be deleted
  int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
S
Shengliang Guan 已提交
111

M
Minghao Li 已提交
112 113 114 115 116
  // return index of last entry
  SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore);

  // return term of last entry
  SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore);
S
Shengliang Guan 已提交
117

M
Minghao Li 已提交
118 119 120 121 122 123
  // update log store commit index with "index"
  int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);

  // return commit index of log
  SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);

S
Shengliang Guan 已提交
124 125
} SSyncLogStore;

M
Minghao Li 已提交
126 127 128
// raft need to persist two variables in storage: currentTerm, voteFor
typedef struct SStateMgr {
  void* data;
S
Shengliang Guan 已提交
129

M
Minghao Li 已提交
130 131
  int32_t (*getCurrentTerm)(struct SStateMgr* pMgr, SyncTerm* pCurrentTerm);
  int32_t (*persistCurrentTerm)(struct SStateMgr* pMgr, SyncTerm pCurrentTerm);
S
Shengliang Guan 已提交
132

M
Minghao Li 已提交
133 134
  int32_t (*getVoteFor)(struct SStateMgr* pMgr, SyncNodeId* pVoteFor);
  int32_t (*persistVoteFor)(struct SStateMgr* pMgr, SyncNodeId voteFor);
S
Shengliang Guan 已提交
135

M
Minghao Li 已提交
136 137
  int32_t (*getSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg);
  int32_t (*persistSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg);
S
Shengliang Guan 已提交
138

M
Minghao Li 已提交
139
} SStateMgr;
S
Shengliang Guan 已提交
140

M
Minghao Li 已提交
141 142 143
typedef struct SSyncInfo {
  SyncGroupId vgId;
  SSyncCfg    syncCfg;
M
Minghao Li 已提交
144
  char        path[TSDB_FILENAME_LEN];
M
Minghao Li 已提交
145
  SWal*       pWal;
M
Minghao Li 已提交
146
  SSyncFSM*   pFsm;
M
Minghao Li 已提交
147 148 149

  void* rpcClient;
  int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
M
Minghao Li 已提交
150 151
  void* queue;
  int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
M
Minghao Li 已提交
152

S
Shengliang Guan 已提交
153 154
} SSyncInfo;

M
Minghao Li 已提交
155 156
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
S
Shengliang Guan 已提交
157 158 159 160

int32_t syncInit();
void    syncCleanUp();

M
Minghao Li 已提交
161 162 163 164
int64_t    syncStart(const SSyncInfo* pSyncInfo);
void       syncStop(int64_t rid);
int32_t    syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t    syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
M
Minghao Li 已提交
165
ESyncState syncGetMyRole(int64_t rid);
S
Shengliang Guan 已提交
166

M
Minghao Li 已提交
167 168 169
// propose with sequence number, to implement linearizable semantics
int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum);

M
Minghao Li 已提交
170
// for compatibility, the same as syncPropose
M
Minghao Li 已提交
171
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
M
Minghao Li 已提交
172

S
Shengliang Guan 已提交
173 174 175 176 177 178 179
extern int32_t sDebugFlag;

#ifdef __cplusplus
}
#endif

#endif /*_TD_LIBS_SYNC_H*/