syncInt.h 5.0 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_SYNCINT_H
#define TDENGINE_SYNCINT_H

#ifdef __cplusplus
extern "C" {
#endif

#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
#define sWarn(...)  { if (sDebugFlag & DEBUG_WARN)  { taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); }}
#define sInfo(...)  { if (sDebugFlag & DEBUG_INFO)  { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}

#define TAOS_SMSG_SYNC_DATA    1
#define TAOS_SMSG_FORWARD      2
#define TAOS_SMSG_FORWARD_RSP  3
#define TAOS_SMSG_SYNC_REQ     4 
#define TAOS_SMSG_SYNC_RSP     5
#define TAOS_SMSG_SYNC_MUST    6
#define TAOS_SMSG_STATUS       7

#define nodeRole    pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus

#pragma pack(push, 1)

typedef struct {
  char     type;        // msg type
  char     pversion;    // protocol version
  char     reserved[6]; // not used
  int32_t  vgId;        // vg ID
  int32_t  len;         // content length, does not include head
  // char     cont[];      // message content starts from here
} SSyncHead;

typedef struct {
  SSyncHead syncHead;
  uint16_t  port;
  char      fqdn[TSDB_FQDN_LEN];
  int32_t   sourceId;  // only for arbitrator
} SFirstPkt;

typedef struct {
  int8_t    role;
  uint64_t  version;
} SPeerStatus;

typedef struct {
  int8_t      role;
  int8_t      ack;
  uint64_t    version;
  SPeerStatus peersStatus[];
} SPeersStatus;

typedef struct {
  char      name[TSDB_FILENAME_LEN];
  uint32_t  magic;
  uint32_t  index;
  uint64_t  fversion;
77
  int64_t   size;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91
} SFileInfo;

typedef struct {
  int8_t    sync;
} SFileAck;

typedef struct {
  uint64_t  version;
  int32_t   code;
} SFwdRsp;
  
#pragma pack(pop)

typedef struct {
S
Shengliang Guan 已提交
92 93 94 95 96
  char *  buffer;
  int32_t bufferSize;
  char *  offset;
  int32_t forwards;
  int32_t code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109
} SRecvBuffer;

typedef struct {
  uint64_t  version;
  void     *mhandle;
  int8_t    acks;
  int8_t    nacks;
  int8_t    confirmed;
  int32_t   code;
  uint64_t  time;
} SFwdInfo;

typedef struct {
S
Shengliang Guan 已提交
110 111 112 113
  int32_t  first;
  int32_t  last;
  int32_t  fwds;  // number of forwards
  SFwdInfo fwdInfo[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
114 115 116
} SSyncFwds;

typedef struct SsyncPeer {
S
TD-1617  
Shengliang Guan 已提交
117 118 119 120 121 122 123 124 125
  int32_t  nodeId;
  uint32_t ip;
  uint16_t port;
  char     fqdn[TSDB_FQDN_LEN];   // peer ip string
  char     id[TSDB_EP_LEN + 32];  // peer vgId + end point
  int8_t   role;
  int8_t   sstatus;   // sync status
  uint64_t version;
  uint64_t sversion;  // track the peer version in retrieve process
S
Shengliang Guan 已提交
126 127 128 129
  int32_t  syncFd;
  int32_t  peerFd;          // forward FD
  int32_t  numOfRetrieves;  // number of retrieves tried
  int32_t  fileChanged;     // a flag to indicate file is changed during retrieving process
S
TD-1617  
Shengliang Guan 已提交
130 131
  void *   timer;
  void *   pConn;
S
Shengliang Guan 已提交
132 133 134
  int32_t  notifyFd;
  int32_t  watchNum;
  int32_t *watchFd;
S
TD-1617  
Shengliang Guan 已提交
135 136
  int8_t   refCount;  // reference count
  struct   SSyncNode *pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137 138 139 140 141 142 143
} SSyncPeer;

typedef struct SSyncNode {
  char         path[TSDB_FILENAME_LEN];
  int8_t       replica;
  int8_t       quorum;
  uint32_t     vgId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144
  int64_t      rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157
  void        *ahandle;
  int8_t       selfIndex;
  SSyncPeer   *peerInfo[TAOS_SYNC_MAX_REPLICA+1];  // extra one for arbitrator
  SSyncPeer   *pMaster;
  int8_t       refCount;
  SRecvBuffer *pRecv;
  SSyncFwds   *pSyncFwds;  // saved forward info if quorum >1
  void        *pFwdTimer;
  FGetFileInfo    getFileInfo;
  FGetWalInfo     getWalInfo;
  FWriteToCache   writeToCache;
  FConfirmForward confirmForward;
  FNotifyRole     notifyRole;
158
  FNotifyFlowCtrl notifyFlowCtrl;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159 160 161 162 163
  FNotifyFileSynced notifyFileSynced;
  pthread_mutex_t mutex;
} SSyncNode;

// sync module global
S
Shengliang Guan 已提交
164 165
extern int32_t tsSyncNum;
extern char    tsNodeFqdn[TSDB_FQDN_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166 167 168

void *syncRetrieveData(void *param);
void *syncRestoreData(void *param);
S
Shengliang Guan 已提交
169 170 171 172 173
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void    syncRestartConnection(SSyncPeer *pPeer);
void    syncBroadcastStatus(SSyncNode *pNode);
void    syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174 175 176 177 178 179

#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_VNODEPEER_H