syncInt.h 5.4 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_SYNCINT_H
#define TDENGINE_SYNCINT_H

#ifdef __cplusplus
extern "C" {
#endif

#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
#define sWarn(...)  { if (sDebugFlag & DEBUG_WARN)  { taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); }}
#define sInfo(...)  { if (sDebugFlag & DEBUG_INFO)  { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}

S
TD-2157  
Shengliang Guan 已提交
30
typedef enum {
S
TD-2211  
Shengliang Guan 已提交
31 32 33 34 35 36 37 38
  TAOS_SMSG_SYNC_DATA     = 1,
  TAOS_SMSG_FORWARD       = 2,
  TAOS_SMSG_FORWARD_RSP   = 3,
  TAOS_SMSG_SYNC_REQ      = 4,
  TAOS_SMSG_SYNC_RSP      = 5,
  TAOS_SMSG_SYNC_MUST     = 6,
  TAOS_SMSG_STATUS        = 7,
  TAOS_SMSG_SYNC_DATA_RSP = 8,
S
TD-2157  
Shengliang Guan 已提交
39
} ESyncMsgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40

S
TD-2041  
Shengliang Guan 已提交
41
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
S
TD-2065  
Shengliang Guan 已提交
42
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
S
TD-2157  
Shengliang Guan 已提交
43 44
#define SYNC_FWD_TIMER  300
#define SYNC_ROLE_TIMER 10000
S
TD-2086  
Shengliang Guan 已提交
45
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3
S
TD-2041  
Shengliang Guan 已提交
46

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
#define nodeRole    pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus

#pragma pack(push, 1)

typedef struct {
  char     type;        // msg type
  char     pversion;    // protocol version
  char     reserved[6]; // not used
  int32_t  vgId;        // vg ID
  int32_t  len;         // content length, does not include head
  // char     cont[];      // message content starts from here
} SSyncHead;

typedef struct {
  SSyncHead syncHead;
  uint16_t  port;
  char      fqdn[TSDB_FQDN_LEN];
  int32_t   sourceId;  // only for arbitrator
} SFirstPkt;

S
TD-2211  
Shengliang Guan 已提交
69 70 71 72
typedef struct {
  int8_t sync;
} SFirstPktRsp;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73 74 75 76 77 78 79 80
typedef struct {
  int8_t    role;
  uint64_t  version;
} SPeerStatus;

typedef struct {
  int8_t      role;
  int8_t      ack;
S
Shengliang Guan 已提交
81 82 83
  int8_t      type;
  int8_t      reserved[3];
  uint16_t    tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
84 85 86 87 88 89 90 91 92
  uint64_t    version;
  SPeerStatus peersStatus[];
} SPeersStatus;

typedef struct {
  char      name[TSDB_FILENAME_LEN];
  uint32_t  magic;
  uint32_t  index;
  uint64_t  fversion;
93
  int64_t   size;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107
} SFileInfo;

typedef struct {
  int8_t    sync;
} SFileAck;

typedef struct {
  uint64_t  version;
  int32_t   code;
} SFwdRsp;
  
#pragma pack(pop)

typedef struct {
S
Shengliang Guan 已提交
108 109 110 111 112
  char *  buffer;
  int32_t bufferSize;
  char *  offset;
  int32_t forwards;
  int32_t code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113 114 115 116 117 118 119 120 121
} SRecvBuffer;

typedef struct {
  uint64_t  version;
  void     *mhandle;
  int8_t    acks;
  int8_t    nacks;
  int8_t    confirmed;
  int32_t   code;
S
TD-1898  
Shengliang Guan 已提交
122
  int64_t   time;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124 125
} SFwdInfo;

typedef struct {
S
Shengliang Guan 已提交
126 127 128 129
  int32_t  first;
  int32_t  last;
  int32_t  fwds;  // number of forwards
  SFwdInfo fwdInfo[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130 131 132
} SSyncFwds;

typedef struct SsyncPeer {
S
TD-1617  
Shengliang Guan 已提交
133 134 135
  int32_t  nodeId;
  uint32_t ip;
  uint16_t port;
S
TD-2157  
Shengliang Guan 已提交
136 137
  int8_t   role;
  int8_t   sstatus;               // sync status
S
TD-1617  
Shengliang Guan 已提交
138 139 140
  char     fqdn[TSDB_FQDN_LEN];   // peer ip string
  char     id[TSDB_EP_LEN + 32];  // peer vgId + end point
  uint64_t version;
S
TD-2157  
Shengliang Guan 已提交
141
  uint64_t sversion;        // track the peer version in retrieve process
S
Shengliang Guan 已提交
142 143 144 145
  int32_t  syncFd;
  int32_t  peerFd;          // forward FD
  int32_t  numOfRetrieves;  // number of retrieves tried
  int32_t  fileChanged;     // a flag to indicate file is changed during retrieving process
S
TD-1617  
Shengliang Guan 已提交
146 147
  void *   timer;
  void *   pConn;
S
Shengliang Guan 已提交
148 149 150
  int32_t  notifyFd;
  int32_t  watchNum;
  int32_t *watchFd;
S
TD-2157  
Shengliang Guan 已提交
151
  int32_t  refCount;  // reference count
S
TD-1617  
Shengliang Guan 已提交
152
  struct   SSyncNode *pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
153 154 155 156 157 158
} SSyncPeer;

typedef struct SSyncNode {
  char         path[TSDB_FILENAME_LEN];
  int8_t       replica;
  int8_t       quorum;
S
TD-2157  
Shengliang Guan 已提交
159
  int8_t       selfIndex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160
  uint32_t     vgId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161
  int64_t      rid;
S
TD-2166  
Shengliang Guan 已提交
162 163
  SSyncPeer *  peerInfo[TAOS_SYNC_MAX_REPLICA + 1];  // extra one for arbitrator
  SSyncPeer *  pMaster;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
164
  SRecvBuffer *pRecv;
S
TD-2166  
Shengliang Guan 已提交
165 166 167
  SSyncFwds *  pSyncFwds;  // saved forward info if quorum >1
  void *       pFwdTimer;
  void *       pRoleTimer;
S
TD-2153  
Shengliang Guan 已提交
168 169 170 171 172 173
  FGetFileInfo      getFileInfo;
  FGetWalInfo       getWalInfo;
  FWriteToCache     writeToCache;
  FConfirmForward   confirmForward;
  FNotifyRole       notifyRole;
  FNotifyFlowCtrl   notifyFlowCtrl;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174
  FNotifyFileSynced notifyFileSynced;
S
TD-2153  
Shengliang Guan 已提交
175
  pthread_mutex_t   mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176 177 178
} SSyncNode;

// sync module global
S
Shengliang Guan 已提交
179 180
extern int32_t tsSyncNum;
extern char    tsNodeFqdn[TSDB_FQDN_LEN];
S
TD-2157  
Shengliang Guan 已提交
181
extern char *  syncStatus[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182 183 184

void *syncRetrieveData(void *param);
void *syncRestoreData(void *param);
S
Shengliang Guan 已提交
185 186 187 188 189
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void    syncRestartConnection(SSyncPeer *pPeer);
void    syncBroadcastStatus(SSyncNode *pNode);
void    syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190 191 192 193 194 195

#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_VNODEPEER_H