syncInt.h 4.5 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
TD-2428  
Shengliang Guan 已提交
16 17
#ifndef TDENGINE_SYNC_INT_H
#define TDENGINE_SYNC_INT_H
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18 19 20 21

#ifdef __cplusplus
extern "C" {
#endif
S
TD-2428  
Shengliang Guan 已提交
22 23
#include "syncMsg.h"
#include "twal.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24 25 26 27 28 29 30 31

#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
#define sWarn(...)  { if (sDebugFlag & DEBUG_WARN)  { taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); }}
#define sInfo(...)  { if (sDebugFlag & DEBUG_INFO)  { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}

S
TD-1927  
Shengliang Guan 已提交
32 33 34
#define SYNC_TCP_THREADS 2
#define SYNC_MAX_NUM 2

S
TD-2041  
Shengliang Guan 已提交
35
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
S
TD-2065  
Shengliang Guan 已提交
36
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
S
TD-1927  
Shengliang Guan 已提交
37 38 39 40

#define SYNC_MAX_FWDS 512
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 15000             // ms
S
TD-2524  
Shengliang Guan 已提交
41
#define SYNC_CHECK_INTERVAL 1000          // ms
S
TD-1927  
Shengliang Guan 已提交
42
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10  // ms
S
TD-2041  
Shengliang Guan 已提交
43

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
44 45 46 47 48
#define nodeRole    pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus

typedef struct {
S
Shengliang Guan 已提交
49 50 51 52 53
  char *  buffer;
  int32_t bufferSize;
  char *  offset;
  int32_t forwards;
  int32_t code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
54 55 56 57 58 59 60 61 62
} SRecvBuffer;

typedef struct {
  uint64_t  version;
  void     *mhandle;
  int8_t    acks;
  int8_t    nacks;
  int8_t    confirmed;
  int32_t   code;
S
TD-1898  
Shengliang Guan 已提交
63
  int64_t   time;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64 65 66
} SFwdInfo;

typedef struct {
S
Shengliang Guan 已提交
67 68 69 70
  int32_t  first;
  int32_t  last;
  int32_t  fwds;  // number of forwards
  SFwdInfo fwdInfo[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71 72 73
} SSyncFwds;

typedef struct SsyncPeer {
S
TD-1617  
Shengliang Guan 已提交
74 75 76
  int32_t  nodeId;
  uint32_t ip;
  uint16_t port;
S
TD-2157  
Shengliang Guan 已提交
77 78
  int8_t   role;
  int8_t   sstatus;               // sync status
S
TD-1617  
Shengliang Guan 已提交
79 80 81
  char     fqdn[TSDB_FQDN_LEN];   // peer ip string
  char     id[TSDB_EP_LEN + 32];  // peer vgId + end point
  uint64_t version;
S
TD-2157  
Shengliang Guan 已提交
82
  uint64_t sversion;        // track the peer version in retrieve process
S
TD-1926  
Shengliang Guan 已提交
83 84
  uint64_t lastFileVer;     // track the file version while retrieve
  uint64_t lastWalVer;      // track the wal version while retrieve
85 86
  SOCKET   syncFd;
  SOCKET   peerFd;          // forward FD
S
Shengliang Guan 已提交
87 88
  int32_t  numOfRetrieves;  // number of retrieves tried
  int32_t  fileChanged;     // a flag to indicate file is changed during retrieving process
S
TD-1927  
Shengliang Guan 已提交
89
  int32_t  refCount;
S
TD-2680  
Shengliang Guan 已提交
90
  int8_t   isArb;
S
TD-1927  
Shengliang Guan 已提交
91
  int64_t  rid;
S
TD-1617  
Shengliang Guan 已提交
92 93 94
  void *   timer;
  void *   pConn;
  struct   SSyncNode *pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95 96 97 98 99 100
} SSyncPeer;

typedef struct SSyncNode {
  char         path[TSDB_FILENAME_LEN];
  int8_t       replica;
  int8_t       quorum;
S
TD-2157  
Shengliang Guan 已提交
101
  int8_t       selfIndex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
  uint32_t     vgId;
S
TD-1927  
Shengliang Guan 已提交
103
  int32_t      refCount;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
  int64_t      rid;
S
TD-2166  
Shengliang Guan 已提交
105 106
  SSyncPeer *  peerInfo[TAOS_SYNC_MAX_REPLICA + 1];  // extra one for arbitrator
  SSyncPeer *  pMaster;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107
  SRecvBuffer *pRecv;
S
TD-2166  
Shengliang Guan 已提交
108 109 110
  SSyncFwds *  pSyncFwds;  // saved forward info if quorum >1
  void *       pFwdTimer;
  void *       pRoleTimer;
S
TD-2798  
Shengliang Guan 已提交
111 112 113
  void *       pTsdb;
  FGetWalInfo       getWalInfoFp;
  FWriteToCache     writeToCacheFp;
S
TD-2153  
Shengliang Guan 已提交
114
  FConfirmForward   confirmForward;
S
TD-2798  
Shengliang Guan 已提交
115 116 117 118 119
  FNotifyRole       notifyRoleFp;
  FNotifyFlowCtrl   notifyFlowCtrlFp;
  FStartSyncFile    startSyncFileFp;
  FStopSyncFile     stopSyncFileFp;
  FGetVersion       getVersionFp;
S
TD-3308  
Shengliang Guan 已提交
120
  FResetVersion     resetVersionFp;
S
TD-2798  
Shengliang Guan 已提交
121 122
  FSendFile         sendFileFp;
  FRecvFile         recvFileFp;
S
TD-2153  
Shengliang Guan 已提交
123
  pthread_mutex_t   mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124 125 126
} SSyncNode;

// sync module global
S
Shengliang Guan 已提交
127 128
extern int32_t tsSyncNum;
extern char    tsNodeFqdn[TSDB_FQDN_LEN];
S
TD-2157  
Shengliang Guan 已提交
129
extern char *  syncStatus[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130

S
TD-1927  
Shengliang Guan 已提交
131 132 133 134 135 136 137
void *     syncRetrieveData(void *param);
void *     syncRestoreData(void *param);
int32_t    syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void       syncRestartConnection(SSyncPeer *pPeer);
void       syncBroadcastStatus(SSyncNode *pNode);
SSyncPeer *syncAcquirePeer(int64_t rid);
void       syncReleasePeer(SSyncPeer *pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138 139 140 141 142 143

#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_VNODEPEER_H