syncInt.h 5.0 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_SYNCINT_H
#define TDENGINE_SYNCINT_H

#ifdef __cplusplus
extern "C" {
#endif

#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
#define sWarn(...)  { if (sDebugFlag & DEBUG_WARN)  { taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); }}
#define sInfo(...)  { if (sDebugFlag & DEBUG_INFO)  { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}

#define TAOS_SMSG_SYNC_DATA    1
#define TAOS_SMSG_FORWARD      2
#define TAOS_SMSG_FORWARD_RSP  3
#define TAOS_SMSG_SYNC_REQ     4 
#define TAOS_SMSG_SYNC_RSP     5
#define TAOS_SMSG_SYNC_MUST    6
#define TAOS_SMSG_STATUS       7

#define nodeRole    pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus

#pragma pack(push, 1)

typedef struct {
  char     type;        // msg type
  char     pversion;    // protocol version
  char     reserved[6]; // not used
  int32_t  vgId;        // vg ID
  int32_t  len;         // content length, does not include head
  // char     cont[];      // message content starts from here
} SSyncHead;

typedef struct {
  SSyncHead syncHead;
  uint16_t  port;
  char      fqdn[TSDB_FQDN_LEN];
  int32_t   sourceId;  // only for arbitrator
} SFirstPkt;

typedef struct {
  int8_t    role;
  uint64_t  version;
} SPeerStatus;

typedef struct {
  int8_t      role;
  int8_t      ack;
S
Shengliang Guan 已提交
68 69 70
  int8_t      type;
  int8_t      reserved[3];
  uint16_t    tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71 72 73 74 75 76 77 78 79
  uint64_t    version;
  SPeerStatus peersStatus[];
} SPeersStatus;

typedef struct {
  char      name[TSDB_FILENAME_LEN];
  uint32_t  magic;
  uint32_t  index;
  uint64_t  fversion;
80
  int64_t   size;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
} SFileInfo;

typedef struct {
  int8_t    sync;
} SFileAck;

typedef struct {
  uint64_t  version;
  int32_t   code;
} SFwdRsp;
  
#pragma pack(pop)

typedef struct {
  char  *buffer;
  int    bufferSize;
  char  *offset;
  int    forwards;
  int    code;
} SRecvBuffer;

typedef struct {
  uint64_t  version;
  void     *mhandle;
  int8_t    acks;
  int8_t    nacks;
  int8_t    confirmed;
  int32_t   code;
  uint64_t  time;
} SFwdInfo;

typedef struct {
  int       first;
  int       last;
  int       fwds;  // number of forwards
  SFwdInfo  fwdInfo[];
} SSyncFwds;

typedef struct SsyncPeer {
S
TD-1617  
Shengliang Guan 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
  int32_t  nodeId;
  uint32_t ip;
  uint16_t port;
  char     fqdn[TSDB_FQDN_LEN];   // peer ip string
  char     id[TSDB_EP_LEN + 32];  // peer vgId + end point
  int8_t   role;
  int8_t   sstatus;   // sync status
  uint64_t version;
  uint64_t sversion;  // track the peer version in retrieve process
  int      syncFd;
  int      peerFd;          // forward FD
  int      numOfRetrieves;  // number of retrieves tried
  int      fileChanged;     // a flag to indicate file is changed during retrieving process
  void *   timer;
  void *   pConn;
  int      notifyFd;
  int      watchNum;
  int *    watchFd;
  int8_t   refCount;  // reference count
  struct   SSyncNode *pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
} SSyncPeer;

typedef struct SSyncNode {
  char         path[TSDB_FILENAME_LEN];
  int8_t       replica;
  int8_t       quorum;
  uint32_t     vgId;
  void        *ahandle;
  int8_t       selfIndex;
  SSyncPeer   *peerInfo[TAOS_SYNC_MAX_REPLICA+1];  // extra one for arbitrator
  SSyncPeer   *pMaster;
  int8_t       refCount;
  SRecvBuffer *pRecv;
  SSyncFwds   *pSyncFwds;  // saved forward info if quorum >1
  void        *pFwdTimer;
  FGetFileInfo    getFileInfo;
  FGetWalInfo     getWalInfo;
  FWriteToCache   writeToCache;
  FConfirmForward confirmForward;
  FNotifyRole     notifyRole;
160
  FNotifyFlowCtrl notifyFlowCtrl;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
  FNotifyFileSynced notifyFileSynced;
  pthread_mutex_t mutex;
} SSyncNode;

// sync module global
extern int  tsSyncNum;
extern char tsNodeFqdn[TSDB_FQDN_LEN];

void *syncRetrieveData(void *param);
void *syncRestoreData(void *param);
int   syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void  syncRestartConnection(SSyncPeer *pPeer);
void  syncBroadcastStatus(SSyncNode *pNode);
void  syncAddPeerRef(SSyncPeer *pPeer);
int   syncDecPeerRef(SSyncPeer *pPeer);

#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_VNODEPEER_H