syncMain.c 42.2 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17 18 19 20 21
#include "os.h"
#include "hash.h"
#include "tlog.h"
#include "tutil.h"
#include "ttimer.h"
22
#include "tref.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24 25 26 27 28 29 30 31 32
#include "tsocket.h"
#include "tglobal.h"
#include "taoserror.h"
#include "taosTcpPool.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"

// global configurable
S
Shengliang Guan 已提交
33 34 35 36 37
int32_t tsMaxSyncNum = 2;
int32_t tsSyncTcpThreads = 2;
int32_t tsMaxWatchFiles = 500;
int32_t tsMaxFwdInfo = 200;
int32_t tsSyncTimer = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
38 39

// module global, not configurable
S
Shengliang Guan 已提交
40 41
int32_t tsSyncNum;  // number of sync in process in whole system
char    tsNodeFqdn[TSDB_FQDN_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42

R
TD-1382  
root 已提交
43
static ttpool_h tsTcpPool;
S
Shengliang Guan 已提交
44 45 46
static void *   tsSyncTmrCtrl = NULL;
static void *   tsVgIdHash;
static int32_t  tsSyncRefId = -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47 48

// local functions
S
Shengliang Guan 已提交
49 50 51
static void    syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void    syncRecoverFromMaster(SSyncPeer *pPeer);
static void    syncCheckPeerConnection(void *param, void *tmrId);
S
TD-2153  
Shengliang Guan 已提交
52
static void    syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
S
Shengliang Guan 已提交
53 54 55 56 57 58 59 60
static void    syncProcessBrokenLink(void *param);
static int32_t syncProcessPeerMsg(void *param, void *buffer);
static void    syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void    syncRemovePeer(SSyncPeer *pPeer);
static void    syncAddArbitrator(SSyncNode *pNode);
static void    syncFreeNode(void *);
static void    syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
static void    syncMonitorFwdInfos(void *param, void *tmrId);
S
TD-2157  
Shengliang Guan 已提交
61
static void    syncMonitorNodeRole(void *param, void *tmrId);
S
Shengliang Guan 已提交
62 63 64 65
static void    syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static void    syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void    syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
66 67 68 69 70
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);

char* syncRole[] = {
  "offline",
  "unsynced",
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71
  "syncing",
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72 73 74 75
  "slave",
  "master"
};

S
TD-2157  
Shengliang Guan 已提交
76 77 78 79 80 81 82 83
char *syncStatus[] = {
  "init",
  "start",
  "file",
  "cache",
  "invalid"
};

S
Shengliang Guan 已提交
84 85 86 87 88 89
typedef enum {
  SYNC_STATUS_BROADCAST,
  SYNC_STATUS_BROADCAST_RSP,
  SYNC_STATUS_SETUP_CONN,
  SYNC_STATUS_SETUP_CONN_RSP,
  SYNC_STATUS_EXCHANGE_DATA,
S
TD-2157  
Shengliang Guan 已提交
90 91 92
  SYNC_STATUS_EXCHANGE_DATA_RSP,
  SYNC_STATUS_CHECK_ROLE,
  SYNC_STATUS_CHECK_ROLE_RSP
S
Shengliang Guan 已提交
93 94 95 96 97 98 99 100
} ESyncStatusType;

char *statusType[] = {
  "broadcast",
  "broadcast-rsp",
  "setup-conn",
  "setup-conn-rsp",
  "exchange-data",
S
TD-2157  
Shengliang Guan 已提交
101 102 103
  "exchange-data-rsp",
  "check-role",
  "check-role-rsp"
S
Shengliang Guan 已提交
104 105 106 107 108 109
};

uint16_t syncGenTranId() {
  return taosRand() & 0XFFFF;
}

S
Shengliang Guan 已提交
110
int32_t syncInit() {
S
TD-2166  
Shengliang Guan 已提交
111
  SPoolInfo info = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112 113 114 115

  info.numOfThreads = tsSyncTcpThreads;
  info.serverIp = 0;
  info.port = tsSyncPort;
S
TD-2041  
Shengliang Guan 已提交
116
  info.bufferSize = SYNC_MAX_SIZE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117 118 119 120 121
  info.processBrokenLink = syncProcessBrokenLink;
  info.processIncomingMsg = syncProcessPeerMsg;
  info.processIncomingConn = syncProcessIncommingConnection;

  tsTcpPool = taosOpenTcpThreadPool(&info);
S
Shengliang Guan 已提交
122 123 124 125
  if (tsTcpPool == NULL) {
    sError("failed to init tcpPool");
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126

S
Shengliang Guan 已提交
127 128
  tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
  if (tsSyncTmrCtrl == NULL) {
S
Shengliang Guan 已提交
129
    sError("failed to init tmrCtrl");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130 131
    taosCloseTcpThreadPool(tsTcpPool);
    tsTcpPool = NULL;
S
Shengliang Guan 已提交
132
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133
  }
S
Shengliang Guan 已提交
134

S
TD-2166  
Shengliang Guan 已提交
135
  tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
S
Shengliang Guan 已提交
136 137 138
  if (tsVgIdHash == NULL) {
    sError("failed to init tsVgIdHash");
    taosTmrCleanUp(tsSyncTmrCtrl);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139 140
    taosCloseTcpThreadPool(tsTcpPool);
    tsTcpPool = NULL;
S
Shengliang Guan 已提交
141
    tsSyncTmrCtrl = NULL;
S
Shengliang Guan 已提交
142 143
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144

145 146 147 148 149 150
  tsSyncRefId = taosOpenRef(200, syncFreeNode);
  if (tsSyncRefId < 0) {
    syncCleanUp();
    return -1;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151
  tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
S
Shengliang Guan 已提交
152 153 154 155 156 157 158 159 160 161 162
  sInfo("sync module initialized successfully");

  return 0;
}

void syncCleanUp() {
  if (tsTcpPool) {
    taosCloseTcpThreadPool(tsTcpPool);
    tsTcpPool = NULL;
  }

S
Shengliang Guan 已提交
163 164 165
  if (tsSyncTmrCtrl) {
    taosTmrCleanUp(tsSyncTmrCtrl);
    tsSyncTmrCtrl = NULL;
S
Shengliang Guan 已提交
166 167
  }

S
Shengliang Guan 已提交
168 169 170
  if (tsVgIdHash) {
    taosHashCleanup(tsVgIdHash);
    tsVgIdHash = NULL;
S
Shengliang Guan 已提交
171 172
  }

173
  taosCloseRef(tsSyncRefId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174
  tsSyncRefId = -1;
175

S
Shengliang Guan 已提交
176
  sInfo("sync module is cleaned up");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177 178
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179
int64_t syncStart(const SSyncInfo *pInfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180 181
  const SSyncCfg *pCfg = &pInfo->syncCfg;

S
TD-2153  
Shengliang Guan 已提交
182
  SSyncNode *pNode = calloc(sizeof(SSyncNode), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184 185
  if (pNode == NULL) {
    sError("no memory to allocate syncNode");
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187 188 189 190 191 192 193 194 195 196
  }

  tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
  pthread_mutex_init(&pNode->mutex, NULL);

  pNode->getFileInfo = pInfo->getFileInfo;
  pNode->getWalInfo = pInfo->getWalInfo;
  pNode->writeToCache = pInfo->writeToCache;
  pNode->notifyRole = pInfo->notifyRole;
  pNode->confirmForward = pInfo->confirmForward;
197
  pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
198
  pNode->notifyFileSynced = pInfo->notifyFileSynced;
S
TD-1926  
Shengliang Guan 已提交
199
  pNode->getVersion = pInfo->getVersion;
R
TD-1382  
root 已提交
200

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201 202 203 204
  pNode->selfIndex = -1;
  pNode->vgId = pInfo->vgId;
  pNode->replica = pCfg->replica;
  pNode->quorum = pCfg->quorum;
205 206
  if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
207 208
  pNode->rid = taosAddRef(tsSyncRefId, pNode);
  if (pNode->rid < 0) {
209
    syncFreeNode(pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210
    return -1;
211 212
  }

S
Shengliang Guan 已提交
213 214 215 216
  for (int32_t index = 0; index < pCfg->replica; ++index) {
    const SNodeInfo *pNodeInfo = pCfg->nodeInfo + index;
    pNode->peerInfo[index] = syncAddPeer(pNode, pNodeInfo);
    if (pNode->peerInfo[index] == NULL) {
S
TD-2153  
Shengliang Guan 已提交
217 218
      sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId,
             pNodeInfo->nodeFqdn, pNodeInfo->nodePort);
S
TD-2051  
Shengliang Guan 已提交
219 220 221 222
      syncStop(pNode->rid);
      exit(1);
    }

R
TD-1382  
root 已提交
223
    if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
S
Shengliang Guan 已提交
224
      pNode->selfIndex = index;
R
TD-1382  
root 已提交
225
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226 227 228 229 230
  }

  if (pNode->selfIndex < 0) {
    sInfo("vgId:%d, this node is not configured", pNode->vgId);
    terrno = TSDB_CODE_SYN_INVALID_CONFIG;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231 232
    syncStop(pNode->rid);
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233 234
  }

S
TD-1617  
Shengliang Guan 已提交
235
  nodeVersion = pInfo->version;  // set the initial version
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236
  nodeRole = (pNode->replica > 1) ? TAOS_SYNC_ROLE_UNSYNCED : TAOS_SYNC_ROLE_MASTER;
S
TD-1617  
Shengliang Guan 已提交
237 238
  sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
        syncRole[nodeRole]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239

S
TD-1617  
Shengliang Guan 已提交
240
  pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241 242 243
  if (pNode->pSyncFwds == NULL) {
    sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245
    syncStop(pNode->rid);
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246 247
  }

S
TD-2157  
Shengliang Guan 已提交
248
  pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249
  if (pNode->pFwdTimer == NULL) {
S
TD-2157  
Shengliang Guan 已提交
250 251 252 253 254 255 256 257
    sError("vgId:%d, failed to allocate fwd timer", pNode->vgId);
    syncStop(pNode->rid);
    return -1;
  }

  pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
  if (pNode->pRoleTimer == NULL) {
    sError("vgId:%d, failed to allocate role timer", pNode->vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258 259
    syncStop(pNode->rid);
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260 261 262
  }

  syncAddArbitrator(pNode);
S
Shengliang Guan 已提交
263
  taosHashPut(tsVgIdHash, &pNode->vgId, sizeof(int32_t), &pNode, sizeof(SSyncNode *));
R
TD-1382  
root 已提交
264 265

  if (pNode->notifyRole) {
266
    (*pNode->notifyRole)(pNode->vgId, nodeRole);
R
TD-1382  
root 已提交
267
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269
  return pNode->rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270 271
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272
void syncStop(int64_t rid) {
R
TD-1382  
root 已提交
273
  SSyncPeer *pPeer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275 276
  SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
  if (pNode == NULL) return;
277

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278 279
  sInfo("vgId:%d, cleanup sync", pNode->vgId);

S
Shengliang Guan 已提交
280
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281

S
Shengliang Guan 已提交
282
  if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283
  if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
S
TD-2157  
Shengliang Guan 已提交
284
  if (pNode->pRoleTimer) taosTmrStop(pNode->pRoleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285

S
Shengliang Guan 已提交
286 287
  for (int32_t index = 0; index < pNode->replica; ++index) {
    pPeer = pNode->peerInfo[index];
R
TD-1382  
root 已提交
288
    if (pPeer) syncRemovePeer(pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289 290 291 292 293
  }

  pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
  if (pPeer) syncRemovePeer(pPeer);

S
TD-2157  
Shengliang Guan 已提交
294
  pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
295

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296 297
  taosReleaseRef(tsSyncRefId, rid);
  taosRemoveRef(tsSyncRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
298 299
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
300
int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
S
Shengliang Guan 已提交
301
  int32_t i, j;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
302

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303 304
  SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
  if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
305

R
TD-1382  
root 已提交
306 307
  sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
        pNode->replica);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308

S
Shengliang Guan 已提交
309
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310 311 312

  for (i = 0; i < pNode->replica; ++i) {
    for (j = 0; j < pNewCfg->replica; ++j) {
R
TD-1382  
root 已提交
313 314
      if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) &&
          (pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328
        break;
    }

    if (j >= pNewCfg->replica) {
      syncRemovePeer(pNode->peerInfo[i]);
      pNode->peerInfo[i] = NULL;
    }
  }

  SSyncPeer *newPeers[TAOS_SYNC_MAX_REPLICA];
  for (i = 0; i < pNewCfg->replica; ++i) {
    const SNodeInfo *pNewNode = &pNewCfg->nodeInfo[i];

    for (j = 0; j < pNode->replica; ++j) {
R
TD-1382  
root 已提交
329 330
      if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) &&
          (pNode->peerInfo[j]->port == pNewNode->nodePort))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332 333 334 335 336 337 338 339
        break;
    }

    if (j >= pNode->replica) {
      newPeers[i] = syncAddPeer(pNode, pNewNode);
    } else {
      newPeers[i] = pNode->peerInfo[j];
    }

R
TD-1382  
root 已提交
340
    if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
341
      pNode->selfIndex = i;
R
TD-1382  
root 已提交
342
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343 344 345 346
  }

  pNode->replica = pNewCfg->replica;
  pNode->quorum = pNewCfg->quorum;
347
  if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348 349
  memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica);

R
TD-1382  
root 已提交
350
  for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
351
    pNode->peerInfo[i] = NULL;
R
TD-1382  
root 已提交
352
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
353 354 355 356 357 358

  syncAddArbitrator(pNode);

  if (pNewCfg->replica <= 1) {
    sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId);
    nodeRole = TAOS_SYNC_ROLE_MASTER;
359
    (*pNode->notifyRole)(pNode->vgId, nodeRole);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
360 361
  }

S
TD-2157  
Shengliang Guan 已提交
362
  pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
363

S
TD-2157  
Shengliang Guan 已提交
364
  sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
365 366
  syncBroadcastStatus(pNode);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
367
  taosReleaseRef(tsSyncRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368 369 370
  return 0;
}

S
Shengliang Guan 已提交
371
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
372 373
  SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
  if (pNode == NULL) return 0; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
374

375
  int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
376

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
377
  taosReleaseRef(tsSyncRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
378 379 380 381

  return code;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
382 383 384
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
  SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
  if (pNode == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
385

386 387 388
  SSyncPeer *pPeer = pNode->pMaster;
  if (pPeer && pNode->quorum > 1) {
    char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
389

390 391 392
    SSyncHead *pHead = (SSyncHead *)msg;
    pHead->type = TAOS_SMSG_FORWARD_RSP;
    pHead->len = sizeof(SFwdRsp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
393

394 395 396
    SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead));
    pFwdRsp->version = version;
    pFwdRsp->code = code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397

S
Shengliang Guan 已提交
398
    int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
S
TD-2157  
Shengliang Guan 已提交
399
    int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
400

401
    if (retLen == msgLen) {
S
Shengliang Guan 已提交
402
      sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
403 404 405 406
    } else {
      sDebug("%s, failed to send forward ack, restart", pPeer->id);
      syncRestartConnection(pPeer);
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
407
  }
408

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
409
  taosReleaseRef(tsSyncRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
410 411
}

S
TD-2157  
Shengliang Guan 已提交
412
#if 0
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
413
void syncRecover(int64_t rid) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
414 415
  SSyncPeer *pPeer;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
416 417
  SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
  if (pNode == NULL) return;
418

R
TD-1382  
root 已提交
419
  // to do: add a few lines to check if recover is OK
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
420 421 422
  // if take this node to unsync state, the whole system may not work

  nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
423
  (*pNode->notifyRole)(pNode->vgId, nodeRole);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
424 425
  nodeVersion = 0;

S
Shengliang Guan 已提交
426
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
427

S
Shengliang Guan 已提交
428
  for (int32_t i = 0; i < pNode->replica; ++i) {
S
TD-2153  
Shengliang Guan 已提交
429
    pPeer = pNode->peerInfo[i];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
430 431 432 433 434
    if (pPeer->peerFd >= 0) {
      syncRestartConnection(pPeer);
    }
  }

S
TD-2157  
Shengliang Guan 已提交
435
  pthread_mutex_unlock(&pNode->mutex);
436

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
437
  taosReleaseRef(tsSyncRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
438
}
S
TD-2157  
Shengliang Guan 已提交
439
#endif
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
440

S
Shengliang Guan 已提交
441
int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
442 443
  SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
  if (pNode == NULL) return -1;
444

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
445
  pNodesRole->selfIndex = pNode->selfIndex;
S
Shengliang Guan 已提交
446
  for (int32_t i = 0; i < pNode->replica; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
447 448 449 450
    pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
    pNodesRole->role[i] = pNode->peerInfo[i]->role;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
451
  taosReleaseRef(tsSyncRefId, rid);
452

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
453 454 455
  return 0;
}

S
Shengliang Guan 已提交
456
static void syncAddArbitrator(SSyncNode *pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
457 458 459 460 461 462 463 464 465 466 467
  SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];

  // if not configured, return right away
  if (tsArbitrator[0] == 0) {
    if (pPeer) syncRemovePeer(pPeer);
    pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL;
    return;
  }

  SNodeInfo nodeInfo;
  nodeInfo.nodeId = 0;
S
Shengliang Guan 已提交
468
  int32_t ret = taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort);
H
Hui Li 已提交
469 470 471
  if (-1 == ret) {
    nodeInfo.nodePort = tsArbitratorPort;
  }
R
TD-1382  
root 已提交
472

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
473 474 475 476 477 478 479
  if (pPeer) {
    if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) {
      return;
    } else {
      syncRemovePeer(pPeer);
      pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL;
    }
R
TD-1382  
root 已提交
480
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
481 482 483 484

  pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
}

485 486
static void syncFreeNode(void *param) {
  SSyncNode *pNode = param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
487

488
  pthread_mutex_destroy(&pNode->mutex);
S
TD-1848  
Shengliang Guan 已提交
489 490 491
  tfree(pNode->pRecv);
  tfree(pNode->pSyncFwds);
  tfree(pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
492 493
}

S
TD-2157  
Shengliang Guan 已提交
494
void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_32(&pPeer->refCount, 1); }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
495

S
Shengliang Guan 已提交
496
int32_t syncDecPeerRef(SSyncPeer *pPeer) {
S
TD-2157  
Shengliang Guan 已提交
497
  if (atomic_sub_fetch_32(&pPeer->refCount, 1) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
498
    taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
499 500

    sDebug("%s, resource is freed", pPeer->id);
S
TD-1848  
Shengliang Guan 已提交
501
    tfree(pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
502 503 504 505 506 507
    return 0;
  }

  return 1;
}

S
Shengliang Guan 已提交
508
static void syncClosePeerConn(SSyncPeer *pPeer) {
S
TD-2157  
Shengliang Guan 已提交
509 510
  sDebug("%s, pfd:%d sfd:%d will be closed", pPeer->id, pPeer->peerFd, pPeer->syncFd);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
511
  taosTmrStopA(&pPeer->timer);
S
Shengliang Guan 已提交
512
  taosClose(pPeer->syncFd);
S
Shengliang Guan 已提交
513
  if (pPeer->peerFd >= 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
514 515 516 517 518
    pPeer->peerFd = -1;
    taosFreeTcpConn(pPeer->pConn);
  }
}

S
Shengliang Guan 已提交
519
static void syncRemovePeer(SSyncPeer *pPeer) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
520 521 522 523 524 525 526
  sInfo("%s, it is removed", pPeer->id);

  pPeer->ip = 0;
  syncClosePeerConn(pPeer);
  syncDecPeerRef(pPeer);
}

S
Shengliang Guan 已提交
527
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
528
  uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
S
TD-2051  
Shengliang Guan 已提交
529 530 531 532 533
  if (ip == 0xFFFFFFFF) {
    sError("failed to add peer, can resolve fqdn:%s since %s", pInfo->nodeFqdn, strerror(errno));
    terrno = TSDB_CODE_RPC_FQDN_ERROR;
    return NULL;
  }
R
TD-1382  
root 已提交
534

S
TD-1617  
Shengliang Guan 已提交
535
  SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
536 537 538 539 540 541
  if (pPeer == NULL) return NULL;

  pPeer->nodeId = pInfo->nodeId;
  tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn));
  pPeer->ip = ip;
  pPeer->port = pInfo->nodePort;
S
TD-1617  
Shengliang Guan 已提交
542
  pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
S
Shengliang Guan 已提交
543
  snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, nodeId:%d", pNode->vgId, pPeer->nodeId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
544 545 546 547 548 549 550

  pPeer->peerFd = -1;
  pPeer->syncFd = -1;
  pPeer->role = TAOS_SYNC_ROLE_OFFLINE;
  pPeer->pSyncNode = pNode;
  pPeer->refCount = 1;

S
TD-2331  
Shengliang Guan 已提交
551
  sInfo("%s, it is configured, ep:%s:%u", pPeer->id, pPeer->fqdn, pPeer->port);
S
Shengliang Guan 已提交
552
  int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
553
  if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
S
TD-1382  
Shengliang Guan 已提交
554
    int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
S
TD-2086  
Shengliang Guan 已提交
555
    if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs;
S
TD-2157  
Shengliang Guan 已提交
556
    sDebug("%s, check peer connection after %d ms", pPeer->id, checkMs);
S
Shengliang Guan 已提交
557
    taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
558
  }
R
TD-1382  
root 已提交
559

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
560
  taosAcquireRef(tsSyncRefId, pNode->rid);  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
561 562 563
  return pPeer;
}

S
Shengliang Guan 已提交
564
void syncBroadcastStatus(SSyncNode *pNode) {
S
TD-2153  
Shengliang Guan 已提交
565 566 567
  for (int32_t index = 0; index < pNode->replica; ++index) {
    if (index == pNode->selfIndex) continue;
    SSyncPeer *pPeer = pNode->peerInfo[index];
S
Shengliang Guan 已提交
568
    syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_BROADCAST, syncGenTranId());
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
569
  }
S
Shengliang Guan 已提交
570
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
571

572
static void syncResetFlowCtrl(SSyncNode *pNode) {
S
TD-2153  
Shengliang Guan 已提交
573 574
  for (int32_t index = 0; index < pNode->replica; ++index) {
    pNode->peerInfo[index]->numOfRetrieves = 0;
575 576
  }

S
Shengliang Guan 已提交
577
  if (pNode->notifyFlowCtrl) {
578
    (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
S
Shengliang Guan 已提交
579
  }
580 581
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
582 583
static void syncChooseMaster(SSyncNode *pNode) {
  SSyncPeer *pPeer;
S
Shengliang Guan 已提交
584 585 586
  int32_t    onlineNum = 0;
  int32_t    index = -1;
  int32_t    replica = pNode->replica;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
587

S
Shengliang Guan 已提交
588
  for (int32_t i = 0; i < pNode->replica; ++i) {
R
TD-1382  
root 已提交
589
    if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
590
      onlineNum++;
R
TD-1382  
root 已提交
591
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
592 593 594 595 596
  }

  if (onlineNum == pNode->replica) {
    // if all peers are online, peer with highest version shall be master
    index = 0;
S
Shengliang Guan 已提交
597
    for (int32_t i = 1; i < pNode->replica; ++i) {
R
TD-1382  
root 已提交
598
      if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
599
        index = i;
R
TD-1382  
root 已提交
600
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
601 602 603 604 605 606 607 608 609 610
    }
  }

  // add arbitrator connection
  SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
  if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) {
    onlineNum++;
    replica = pNode->replica + 1;
  }

S
TD-1617  
Shengliang Guan 已提交
611
  if (index < 0 && onlineNum > replica / 2.0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
612
    // over half of nodes are online
S
Shengliang Guan 已提交
613
    for (int32_t i = 0; i < pNode->replica; ++i) {
S
TD-1617  
Shengliang Guan 已提交
614
      // slave with highest version shall be master
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
615 616
      pPeer = pNode->peerInfo[i];
      if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
R
TD-1382  
root 已提交
617
        if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
618
          index = i;
R
TD-1382  
root 已提交
619
        }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
620 621 622 623 624 625 626 627
      }
    }
  }

  if (index >= 0) {
    if (index == pNode->selfIndex) {
      sInfo("vgId:%d, start to work as master", pNode->vgId);
      nodeRole = TAOS_SYNC_ROLE_MASTER;
S
TD-2086  
Shengliang Guan 已提交
628

S
TD-2086  
Shengliang Guan 已提交
629 630 631
      // Wait for other nodes to receive status to avoid version inconsistency
      taosMsleep(SYNC_WAIT_AFTER_CHOOSE_MASTER);

632
      syncResetFlowCtrl(pNode);
633
      (*pNode->notifyRole)(pNode->vgId, nodeRole);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
634 635 636 637 638 639 640
    } else {
      pPeer = pNode->peerInfo[index];
      sInfo("%s, it shall work as master", pPeer->id);
    }
  } else {
    sDebug("vgId:%d, failed to choose master", pNode->vgId);
  }
S
Shengliang Guan 已提交
641 642 643
}

static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
S
Shengliang Guan 已提交
644
  int32_t onlineNum = 0;
S
TD-2157  
Shengliang Guan 已提交
645
  int32_t masterIndex = -1;
S
Shengliang Guan 已提交
646
  int32_t replica = pNode->replica;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
647

S
TD-2157  
Shengliang Guan 已提交
648 649
  for (int32_t index = 0; index < pNode->replica; ++index) {
    if (pNode->peerInfo[index]->role != TAOS_SYNC_ROLE_OFFLINE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
650
      onlineNum++;
R
TD-1382  
root 已提交
651
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
652 653 654 655 656 657 658 659 660
  }

  // add arbitrator connection
  SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
  if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) {
    onlineNum++;
    replica = pNode->replica + 1;
  }

S
Shengliang Guan 已提交
661
  if (onlineNum <= replica * 0.5) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
662 663
    if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
      nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
664
      (*pNode->notifyRole)(pNode->vgId, nodeRole);
S
TD-2157  
Shengliang Guan 已提交
665
      sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
666 667
    }
  } else {
S
TD-2157  
Shengliang Guan 已提交
668 669
    for (int32_t index = 0; index < pNode->replica; ++index) {
      SSyncPeer *pTemp = pNode->peerInfo[index];
S
Shengliang Guan 已提交
670
      if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue;
S
TD-2157  
Shengliang Guan 已提交
671 672
      if (masterIndex < 0) {
        masterIndex = index;
S
Shengliang Guan 已提交
673
      } else {  // multiple masters, it shall not happen
S
TD-2157  
Shengliang Guan 已提交
674
        if (masterIndex == pNode->selfIndex) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
675 676
          sError("%s, peer is master, work as slave instead", pTemp->id);
          nodeRole = TAOS_SYNC_ROLE_SLAVE;
677
          (*pNode->notifyRole)(pNode->vgId, nodeRole);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
678 679 680 681 682
        }
      }
    }
  }

S
TD-2157  
Shengliang Guan 已提交
683
  SSyncPeer *pMaster = (masterIndex >= 0) ? pNode->peerInfo[masterIndex] : NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
684 685 686
  return pMaster;
}

S
Shengliang Guan 已提交
687
static int32_t syncValidateMaster(SSyncPeer *pPeer) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
688
  SSyncNode *pNode = pPeer->pSyncNode;
S
TD-2157  
Shengliang Guan 已提交
689
  int32_t code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
690 691

  if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
S
TD-2157  
Shengliang Guan 已提交
692
    sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
693
    nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
694
    (*pNode->notifyRole)(pNode->vgId, nodeRole);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
695 696
    code = -1;

S
TD-2157  
Shengliang Guan 已提交
697 698 699
    for (int32_t index = 0; index < pNode->replica; ++index) {
      if (index == pNode->selfIndex) continue;
      syncRestartPeer(pNode->peerInfo[index]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
700
    }
S
Shengliang Guan 已提交
701
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
702 703 704 705

  return code;
}

S
TD-2157  
Shengliang Guan 已提交
706
static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t newPeerRole) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
707
  SSyncNode *pNode = pPeer->pSyncNode;
S
TD-2157  
Shengliang Guan 已提交
708 709 710
  int8_t oldPeerRole = pPeer->role;
  int8_t oldSelfRole = nodeRole;
  int8_t syncRequired = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
711

S
TD-2157  
Shengliang Guan 已提交
712
  pPeer->role = newPeerRole;
S
TD-2157  
Shengliang Guan 已提交
713
  sDebug("%s, peer role:%s change to %s", pPeer->id, syncRole[oldPeerRole], syncRole[newPeerRole]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
714 715 716

  SSyncPeer *pMaster = syncCheckMaster(pNode);

R
TD-1382  
root 已提交
717
  if (pMaster) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
718 719
    // master is there
    pNode->pMaster = pMaster;
S
TD-2157  
Shengliang Guan 已提交
720
    sDebug("%s, it is the master, sver:%" PRIu64, pMaster->id, pMaster->version);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
721 722 723 724

    if (syncValidateMaster(pPeer) < 0) return;

    if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) {
S
Shengliang Guan 已提交
725
      if (nodeVersion < pMaster->version) {
S
TD-2157  
Shengliang Guan 已提交
726
        sDebug("%s, is master, sync required, self sver:%" PRIu64, pMaster->id, nodeVersion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
727 728
        syncRequired = 1;
      } else {
S
TD-2157  
Shengliang Guan 已提交
729
        sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
730
        nodeRole = TAOS_SYNC_ROLE_SLAVE;
731
        (*pNode->notifyRole)(pNode->vgId, nodeRole);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
732
      }
S
Shengliang Guan 已提交
733
    } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
S
TD-2157  
Shengliang Guan 已提交
734
      sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
735 736 737
    }
  } else {
    // master not there, if all peer's state and version are consistent, choose the master
S
Shengliang Guan 已提交
738
    int32_t consistent = 0;
S
TD-2157  
Shengliang Guan 已提交
739 740 741 742 743 744
    int32_t index = 0;
    if (peersStatus != NULL) {
      for (index = 0; index < pNode->replica; ++index) {
        SSyncPeer *pTemp = pNode->peerInfo[index];
        if (pTemp->role != peersStatus[index].role) break;
        if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[index].version)) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
745
      }
R
TD-1382  
root 已提交
746

S
TD-2157  
Shengliang Guan 已提交
747
      if (index >= pNode->replica) consistent = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
748 749 750 751
    } else {
      if (pNode->replica == 2) consistent = 1;
    }

R
TD-1382  
root 已提交
752
    if (consistent) {
S
TD-2157  
Shengliang Guan 已提交
753
      sDebug("vgId:%d, choose master", pNode->vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
754
      syncChooseMaster(pNode);
S
TD-2157  
Shengliang Guan 已提交
755
    } else {
S
TD-2086  
Shengliang Guan 已提交
756
      sDebug("vgId:%d, cannot choose master since roles inequality", pNode->vgId);
R
TD-1382  
root 已提交
757
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
758 759 760 761 762 763
  }

  if (syncRequired) {
    syncRecoverFromMaster(pMaster);
  }

S
TD-2157  
Shengliang Guan 已提交
764 765
  if (oldPeerRole != newPeerRole || nodeRole != oldSelfRole) {
    sDebug("vgId:%d, roles changed, broadcast status", pNode->vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
766
    syncBroadcastStatus(pNode);
R
TD-1382  
root 已提交
767
  }
768

R
TD-1382  
root 已提交
769
  if (nodeRole != TAOS_SYNC_ROLE_MASTER) {
770
    syncResetFlowCtrl(pNode);
R
TD-1382  
root 已提交
771
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
772 773 774
}

static void syncRestartPeer(SSyncPeer *pPeer) {
S
TD-2157  
Shengliang Guan 已提交
775
  sDebug("%s, restart peer connection, last sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
776 777 778 779

  syncClosePeerConn(pPeer);

  pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
S
TD-2157  
Shengliang Guan 已提交
780
  sDebug("%s, peer conn is restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
781

S
Shengliang Guan 已提交
782
  int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
R
TD-1382  
root 已提交
783
  if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
S
TD-2157  
Shengliang Guan 已提交
784
    sDebug("%s, check peer connection in 1000 ms", pPeer->id);
S
Shengliang Guan 已提交
785
    taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
R
TD-1382  
root 已提交
786
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
787 788
}

S
Shengliang Guan 已提交
789
void syncRestartConnection(SSyncPeer *pPeer) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
790 791 792 793 794 795
  if (pPeer->ip == 0) return;

  syncRestartPeer(pPeer);
  syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE);
}

S
Shengliang Guan 已提交
796
static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
797 798 799 800 801 802 803
  SSyncNode *pNode = pPeer->pSyncNode;
  sDebug("%s, sync-req is received", pPeer->id);

  if (pPeer->ip == 0) return;

  if (nodeRole != TAOS_SYNC_ROLE_MASTER) {
    sError("%s, I am not master anymore", pPeer->id);
S
Shengliang Guan 已提交
804
    taosClose(pPeer->syncFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
805 806 807 808
    return;
  }

  if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) {
S
TD-2157  
Shengliang Guan 已提交
809
    sDebug("%s, sync is already started for sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
R
TD-1382  
root 已提交
810
    return;  // already started
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
811 812 813 814
  }

  // start a new thread to retrieve the data
  syncAddPeerRef(pPeer);
R
TD-1382  
root 已提交
815 816
  pthread_attr_t thattr;
  pthread_t      thread;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
817 818
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
S
Shengliang Guan 已提交
819
  int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
820 821 822
  pthread_attr_destroy(&thattr);

  if (ret != 0) {
S
TD-2000  
Shengliang Guan 已提交
823
    sError("%s, failed to create sync thread since %s", pPeer->id, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
824 825 826
    syncDecPeerRef(pPeer);
  } else {
    pPeer->sstatus = TAOS_SYNC_STATUS_START;
S
TD-2157  
Shengliang Guan 已提交
827
    sDebug("%s, thread is created to retrieve data, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
828 829 830
  }
}

S
Shengliang Guan 已提交
831
static void syncNotStarted(void *param, void *tmrId) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
832 833 834
  SSyncPeer *pPeer = param;
  SSyncNode *pNode = pPeer->pSyncNode;

S
Shengliang Guan 已提交
835
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
836
  pPeer->timer = NULL;
S
TD-2157  
Shengliang Guan 已提交
837 838
  pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
  sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
839
  syncRestartConnection(pPeer);
S
TD-2157  
Shengliang Guan 已提交
840
  pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
841 842 843
}

static void syncTryRecoverFromMaster(void *param, void *tmrId) {
R
TD-1382  
root 已提交
844 845
  SSyncPeer *pPeer = param;
  SSyncNode *pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
846

S
Shengliang Guan 已提交
847
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
848
  syncRecoverFromMaster(pPeer);
S
TD-2157  
Shengliang Guan 已提交
849
  pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
850 851
}

S
Shengliang Guan 已提交
852 853
static void syncRecoverFromMaster(SSyncPeer *pPeer) {
  SSyncNode *pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
854

S
Shengliang Guan 已提交
855
  if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
S
TD-2157  
Shengliang Guan 已提交
856
    sDebug("%s, sync is already started since sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
857
    return;
S
Shengliang Guan 已提交
858
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
859 860

  taosTmrStopA(&pPeer->timer);
861 862 863

  // Ensure the sync of mnode not interrupted
  if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
864
    sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
S
Shengliang Guan 已提交
865
    taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
866 867 868
    return;
  }

R
TD-1382  
root 已提交
869
  sDebug("%s, try to sync", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
870 871 872 873 874 875 876 877

  SFirstPkt firstPkt;
  memset(&firstPkt, 0, sizeof(firstPkt));
  firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
  firstPkt.syncHead.vgId = pNode->vgId;
  firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
  tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
  firstPkt.port = tsSyncPort;
S
Shengliang Guan 已提交
878
  taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
879

S
TD-2157  
Shengliang Guan 已提交
880
  if (taosWriteMsg(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
881 882 883
    sError("%s, failed to send sync-req to peer", pPeer->id);
  } else {
    nodeSStatus = TAOS_SYNC_STATUS_START;
S
TD-2157  
Shengliang Guan 已提交
884
    sInfo("%s, sync-req is sent to peer, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
885 886 887
  }
}

S
Shengliang Guan 已提交
888
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
R
TD-1382  
root 已提交
889 890 891 892
  SSyncNode *pNode = pPeer->pSyncNode;
  SFwdRsp *  pFwdRsp = (SFwdRsp *)cont;
  SSyncFwds *pSyncFwds = pNode->pSyncFwds;
  SFwdInfo * pFwdInfo;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
893

S
Shengliang Guan 已提交
894
  sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
895 896 897 898
  SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;

  if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
    // find the forwardInfo from first
S
Shengliang Guan 已提交
899
    for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
R
TD-1382  
root 已提交
900
      pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
901 902
      if (pFwdRsp->version == pFwdInfo->version) break;
    }
R
TD-1382  
root 已提交
903

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
904 905 906 907 908
    syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
    syncRemoveConfirmedFwdInfo(pNode);
  }
}

S
Shengliang Guan 已提交
909
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
R
TD-1382  
root 已提交
910 911
  SSyncNode *pNode = pPeer->pSyncNode;
  SWalHead * pHead = (SWalHead *)cont;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
912

S
Shengliang Guan 已提交
913
  sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
914 915

  if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
R
TD-1382  
root 已提交
916
    // nodeVersion = pHead->version;
917
    (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
R
TD-1382  
root 已提交
918
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
919 920 921
    if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
      syncSaveIntoBuffer(pPeer, pHead);
    } else {
S
TD-2157  
Shengliang Guan 已提交
922 923
      sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
             pHead->version);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
924 925 926 927
    }
  }
}

S
Shengliang Guan 已提交
928 929
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
  SSyncNode *   pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
930 931
  SPeersStatus *pPeersStatus = (SPeersStatus *)cont;

S
add log  
Shengliang Guan 已提交
932
  sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
S
TD-2157  
Shengliang Guan 已提交
933 934
         pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role],
         pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type], pPeer->peerFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
935 936 937 938

  pPeer->version = pPeersStatus->version;
  syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);

R
TD-1382  
root 已提交
939
  if (pPeersStatus->ack) {
S
Shengliang Guan 已提交
940
    syncSendPeersStatusMsgToPeer(pPeer, 0, pPeersStatus->type + 1, pPeersStatus->tranId);
R
TD-1382  
root 已提交
941
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
942 943
}

S
Shengliang Guan 已提交
944
static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
R
TD-1382  
root 已提交
945
  if (pPeer->peerFd < 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
946

S
Shengliang Guan 已提交
947
  int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
948 949 950 951 952 953
  if (hlen != sizeof(SSyncHead)) {
    sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen);
    return -1;
  }

  // head.len = htonl(head.len);
S
Shengliang Guan 已提交
954
  if (pHead->len < 0) {
S
TD-2041  
Shengliang Guan 已提交
955
    sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
956
    return -1;
S
Shengliang Guan 已提交
957
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
958

S
TD-2041  
Shengliang Guan 已提交
959
  assert(pHead->len <= TSDB_MAX_WAL_SIZE);
S
Shengliang Guan 已提交
960
  int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
961 962 963 964 965 966 967 968
  if (bytes != pHead->len) {
    sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
969
static int32_t syncProcessPeerMsg(void *param, void *buffer) {
R
TD-1382  
root 已提交
970 971
  SSyncPeer *pPeer = param;
  SSyncHead  head;
S
TD-1617  
Shengliang Guan 已提交
972
  char *     cont = buffer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
973 974

  SSyncNode *pNode = pPeer->pSyncNode;
S
Shengliang Guan 已提交
975
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
976

S
Shengliang Guan 已提交
977
  int32_t code = syncReadPeerMsg(pPeer, &head, cont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
978 979 980 981 982 983 984 985 986 987 988 989 990

  if (code == 0) {
    if (head.type == TAOS_SMSG_FORWARD) {
      syncProcessForwardFromPeer(cont, pPeer);
    } else if (head.type == TAOS_SMSG_FORWARD_RSP) {
      syncProcessFwdResponse(cont, pPeer);
    } else if (head.type == TAOS_SMSG_SYNC_REQ) {
      syncProcessSyncRequest(cont, pPeer);
    } else if (head.type == TAOS_SMSG_STATUS) {
      syncProcessPeersStatusMsg(cont, pPeer);
    }
  }

S
TD-2157  
Shengliang Guan 已提交
991
  pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
992 993 994 995

  return code;
}

R
TD-1382  
root 已提交
996
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
997

S
Shengliang Guan 已提交
998
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
999 1000 1001
  SSyncNode *pNode = pPeer->pSyncNode;
  char       msg[statusMsgLen] = {0};

R
TD-1382  
root 已提交
1002
  if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1003

R
TD-1382  
root 已提交
1004 1005
  SSyncHead *   pHead = (SSyncHead *)msg;
  SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1006 1007 1008 1009 1010 1011 1012

  pHead->type = TAOS_SMSG_STATUS;
  pHead->len = statusMsgLen - sizeof(SSyncHead);

  pPeersStatus->version = nodeVersion;
  pPeersStatus->role = nodeRole;
  pPeersStatus->ack = ack;
S
Shengliang Guan 已提交
1013 1014
  pPeersStatus->type = type;
  pPeersStatus->tranId = tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1015

S
Shengliang Guan 已提交
1016
  for (int32_t i = 0; i < pNode->replica; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1017 1018 1019 1020
    pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
    pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
  }

S
TD-2157  
Shengliang Guan 已提交
1021
  int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1022
  if (retLen == statusMsgLen) {
S
add log  
Shengliang Guan 已提交
1023
    sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
S
TD-2157  
Shengliang Guan 已提交
1024 1025 1026
           pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
           syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId,
           statusType[pPeersStatus->type], pPeer->peerFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
  } else {
    sDebug("%s, failed to send status msg, restart", pPeer->id);
    syncRestartConnection(pPeer);
  }
}

static void syncSetupPeerConnection(SSyncPeer *pPeer) {
  SSyncNode *pNode = pPeer->pSyncNode;

  taosTmrStopA(&pPeer->timer);
  if (pPeer->peerFd >= 0) {
    sDebug("%s, send role version to peer", pPeer->id);
S
Shengliang Guan 已提交
1039
    syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_SETUP_CONN, syncGenTranId());
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1040 1041 1042
    return;
  }

S
Shengliang Guan 已提交
1043
  int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1044
  if (connFd < 0) {
S
TD-2157  
Shengliang Guan 已提交
1045
    sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
S
Shengliang Guan 已提交
1046
    taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1047 1048 1049 1050 1051
    return;
  }

  SFirstPkt firstPkt;
  memset(&firstPkt, 0, sizeof(firstPkt));
R
TD-1382  
root 已提交
1052
  firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1053
  firstPkt.syncHead.type = TAOS_SMSG_STATUS;
R
TD-1382  
root 已提交
1054
  tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1055 1056 1057
  firstPkt.port = tsSyncPort;
  firstPkt.sourceId = pNode->vgId;  // tell arbitrator its vgId

S
TD-2157  
Shengliang Guan 已提交
1058
  if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
S
TD-2157  
Shengliang Guan 已提交
1059
    sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d", pPeer->id, connFd, pPeer->syncFd);
R
TD-1382  
root 已提交
1060
    pPeer->peerFd = connFd;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1061 1062 1063 1064
    pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
    pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
    syncAddPeerRef(pPeer);
  } else {
S
TD-2157  
Shengliang Guan 已提交
1065 1066
    sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
    taosClose(connFd);
S
Shengliang Guan 已提交
1067
    taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1068 1069 1070
  }
}

S
Shengliang Guan 已提交
1071
static void syncCheckPeerConnection(void *param, void *tmrId) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1072 1073 1074
  SSyncPeer *pPeer = param;
  SSyncNode *pNode = pPeer->pSyncNode;

S
Shengliang Guan 已提交
1075
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1076 1077

  sDebug("%s, check peer connection", pPeer->id);
R
TD-1382  
root 已提交
1078
  syncSetupPeerConnection(pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1079

S
TD-2157  
Shengliang Guan 已提交
1080
  pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1081 1082
}

S
Shengliang Guan 已提交
1083
static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1084 1085 1086
  taosTmrStopA(&pPeer->timer);

  pthread_attr_t thattr;
R
TD-1382  
root 已提交
1087
  pthread_t      thread;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1088 1089 1090 1091
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);

  syncAddPeerRef(pPeer);
S
Shengliang Guan 已提交
1092
  int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1093 1094 1095 1096
  pthread_attr_destroy(&thattr);

  if (ret < 0) {
    sError("%s, failed to create sync thread", pPeer->id);
S
Shengliang Guan 已提交
1097
    taosClose(pPeer->syncFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1098
    syncDecPeerRef(pPeer);
R
TD-1382  
root 已提交
1099
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1100 1101 1102 1103
    sInfo("%s, sync connection is up", pPeer->id);
  }
}

S
Shengliang Guan 已提交
1104 1105 1106
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
  char    ipstr[24];
  int32_t i;
R
TD-1382  
root 已提交
1107

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1108 1109 1110 1111 1112
  tinet_ntoa(ipstr, sourceIp);
  sDebug("peer TCP connection from ip:%s", ipstr);

  SFirstPkt firstPkt;
  if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
S
TD-2000  
Shengliang Guan 已提交
1113
    sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1114 1115 1116 1117
    taosCloseSocket(connFd);
    return;
  }

S
TD-1617  
Shengliang Guan 已提交
1118
  int32_t     vgId = firstPkt.syncHead.vgId;
S
TD-2086  
Shengliang Guan 已提交
1119
  SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1120 1121 1122 1123 1124 1125 1126
  if (ppNode == NULL || *ppNode == NULL) {
    sError("vgId:%d, vgId could not be found", vgId);
    taosCloseSocket(connFd);
    return;
  }

  SSyncNode *pNode = *ppNode;
S
Shengliang Guan 已提交
1127
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1128 1129 1130 1131

  SSyncPeer *pPeer;
  for (i = 0; i < pNode->replica; ++i) {
    pPeer = pNode->peerInfo[i];
R
TD-1382  
root 已提交
1132
    if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
  }

  pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
  if (pPeer == NULL) {
    sError("vgId:%d, peer:%s not configured", pNode->vgId, firstPkt.fqdn);
    taosCloseSocket(connFd);
    // syncSendVpeerCfgMsg(sync);
  } else {
    // first packet tells what kind of link
    if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) {
      pPeer->syncFd = connFd;
      syncCreateRestoreDataThread(pPeer);
    } else {
S
Shengliang Guan 已提交
1146
      sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1147 1148 1149 1150 1151
      syncClosePeerConn(pPeer);
      pPeer->peerFd = connFd;
      pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
      syncAddPeerRef(pPeer);
      sDebug("%s, ready to exchange data", pPeer->id);
S
Shengliang Guan 已提交
1152
      syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1153 1154 1155
    }
  }

S
TD-2157  
Shengliang Guan 已提交
1156
  pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1157 1158 1159 1160 1161 1162 1163
}

static void syncProcessBrokenLink(void *param) {
  if (param == NULL) return;  // the connection for arbitrator
  SSyncPeer *pPeer = param;
  SSyncNode *pNode = pPeer->pSyncNode;

1164
  if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return;
S
Shengliang Guan 已提交
1165
  pthread_mutex_lock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1166

S
TD-2157  
Shengliang Guan 已提交
1167
  sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1168 1169 1170 1171 1172 1173
  pPeer->peerFd = -1;

  if (syncDecPeerRef(pPeer) != 0) {
    syncRestartConnection(pPeer);
  }

S
TD-2157  
Shengliang Guan 已提交
1174
  pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1175
  taosReleaseRef(tsSyncRefId, pNode->rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1176 1177
}

S
Shengliang Guan 已提交
1178
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1179
  SSyncFwds *pSyncFwds = pNode->pSyncFwds;
S
TD-2153  
Shengliang Guan 已提交
1180
  int64_t    time = taosGetTimestampMs();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1181 1182 1183 1184

  if (pSyncFwds->fwds >= tsMaxFwdInfo) {
    pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
    pSyncFwds->fwds--;
R
TD-1382  
root 已提交
1185 1186 1187 1188 1189
  }

  if (pSyncFwds->fwds > 0) {
    pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1190 1191

  SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
S
TD-2085  
Shengliang Guan 已提交
1192
  memset(pFwdInfo, 0, sizeof(SFwdInfo));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1193 1194 1195 1196 1197
  pFwdInfo->version = version;
  pFwdInfo->mhandle = mhandle;
  pFwdInfo->time = time;

  pSyncFwds->fwds++;
S
Shengliang Guan 已提交
1198
  sTrace("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1199 1200
}

S
Shengliang Guan 已提交
1201
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1202 1203
  SSyncFwds *pSyncFwds = pNode->pSyncFwds;

S
Shengliang Guan 已提交
1204 1205
  int32_t fwds = pSyncFwds->fwds;
  for (int32_t i = 0; i < fwds; ++i) {
R
TD-1382  
root 已提交
1206
    SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1207 1208
    if (pFwdInfo->confirmed == 0) break;

R
TD-1382  
root 已提交
1209
    pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1210 1211
    pSyncFwds->fwds--;
    if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
S
TD-1898  
Shengliang Guan 已提交
1212
    // sDebug("vgId:%d, fwd info is removed, hver:%d, fwds:%d",
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1213 1214 1215 1216 1217
    //        pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
    memset(pFwdInfo, 0, sizeof(SFwdInfo));
  }
}

S
Shengliang Guan 已提交
1218
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) {
S
Shengliang Guan 已提交
1219
  int32_t confirm = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1220 1221 1222 1223
  if (pFwdInfo->code == 0) pFwdInfo->code = code;

  if (code == 0) {
    pFwdInfo->acks++;
R
TD-1382  
root 已提交
1224
    if (pFwdInfo->acks >= pNode->quorum - 1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1225
      confirm = 1;
R
TD-1382  
root 已提交
1226
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1227 1228
  } else {
    pFwdInfo->nacks++;
R
TD-1382  
root 已提交
1229
    if (pFwdInfo->nacks > pNode->replica - pNode->quorum) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1230
      confirm = 1;
R
TD-1382  
root 已提交
1231
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1232 1233 1234
  }

  if (confirm && pFwdInfo->confirmed == 0) {
S
Shengliang Guan 已提交
1235
    sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
1236
    (*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1237 1238 1239 1240
    pFwdInfo->confirmed = 1;
  }
}

S
TD-2157  
Shengliang Guan 已提交
1241 1242 1243 1244 1245 1246 1247 1248 1249
static void syncMonitorNodeRole(void *param, void *tmrId) {
  int64_t    rid = (int64_t)param;
  SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
  if (pNode == NULL) return;

  for (int32_t index = 0; index < pNode->replica; index++) {
    if (index == pNode->selfIndex) continue;

    SSyncPeer *pPeer = pNode->peerInfo[index];
S
TD-2157  
Shengliang Guan 已提交
1250 1251
    if (/*pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && */ nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue;
    if (/*pPeer->sstatus > TAOS_SYNC_STATUS_INIT || */ nodeSStatus > TAOS_SYNC_STATUS_INIT) continue;
S
TD-2000  
Shengliang Guan 已提交
1252

S
TD-2157  
Shengliang Guan 已提交
1253 1254
    sDebug("%s, check roles since self:%s sstatus:%s, peer:%s sstatus:%s", pPeer->id, syncRole[pPeer->role],
           syncStatus[pPeer->sstatus], syncRole[nodeRole], syncStatus[nodeSStatus]);
S
TD-2000  
Shengliang Guan 已提交
1255
    syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId());
S
TD-2157  
Shengliang Guan 已提交
1256
    break;
S
TD-2157  
Shengliang Guan 已提交
1257 1258 1259 1260 1261 1262
  }

  pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
  taosReleaseRef(tsSyncRefId, rid);
}

S
Shengliang Guan 已提交
1263
static void syncMonitorFwdInfos(void *param, void *tmrId) {
S
TD-2157  
Shengliang Guan 已提交
1264
  int64_t    rid = (int64_t)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1265 1266
  SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
  if (pNode == NULL) return;
1267

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1268
  SSyncFwds *pSyncFwds = pNode->pSyncFwds;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1269

S
TD-1898  
Shengliang Guan 已提交
1270 1271
  if (pSyncFwds) {
    int64_t time = taosGetTimestampMs();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1272

1273
    if (pSyncFwds->fwds > 0) {
S
Shengliang Guan 已提交
1274
      pthread_mutex_lock(&pNode->mutex);
S
Shengliang Guan 已提交
1275
      for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
1276
        SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
S
TD-1898  
Shengliang Guan 已提交
1277 1278 1279 1280
        if (ABS(time - pFwdInfo->time) < 2000) break;

        sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
               pFwdInfo->version, time, pFwdInfo->time);
S
TD-1795  
Shengliang Guan 已提交
1281
        syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_SYN_CONFIRM_EXPIRED);
1282 1283 1284
      }

      syncRemoveConfirmedFwdInfo(pNode);
S
TD-2157  
Shengliang Guan 已提交
1285
      pthread_mutex_unlock(&pNode->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1286 1287
    }

S
TD-2157  
Shengliang Guan 已提交
1288
    pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
R
TD-1382  
root 已提交
1289
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1290

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1291
  taosReleaseRef(tsSyncRefId, rid);
1292
}
R
TD-1382  
root 已提交
1293

S
Shengliang Guan 已提交
1294
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) {
1295 1296 1297
  SSyncPeer *pPeer;
  SSyncHead *pSyncHead;
  SWalHead * pWalHead = data;
S
Shengliang Guan 已提交
1298
  int32_t    fwdLen;
1299 1300
  int32_t    code = 0;

S
TD-1948  
Shengliang Guan 已提交
1301
  if (pWalHead->version > nodeVersion + 1) {
S
TD-1898  
Shengliang Guan 已提交
1302
    sError("vgId:%d, hver:%" PRIu64 ", inconsistent with sver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion);
S
TD-1948  
Shengliang Guan 已提交
1303 1304 1305 1306 1307 1308
    if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
      sInfo("vgId:%d, restart connection", pNode->vgId);
      for (int32_t i = 0; i < pNode->replica; ++i) {
        pPeer = pNode->peerInfo[i];
        syncRestartConnection(pPeer);
      }
1309
    }
S
TD-1948  
Shengliang Guan 已提交
1310

1311 1312 1313 1314
    return TSDB_CODE_SYN_INVALID_VERSION;
  }

  // always update version
S
TD-2211  
Shengliang Guan 已提交
1315 1316
  sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
         syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
1317 1318 1319 1320 1321 1322 1323
  nodeVersion = pWalHead->version;

  if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;

  // only pkt from RPC or CQ can be forwarded
  if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;

S
TD-2211  
Shengliang Guan 已提交
1324
    // a hacker way to improve the performance
1325 1326 1327 1328 1329 1330
  pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
  pSyncHead->type = TAOS_SMSG_FORWARD;
  pSyncHead->pversion = 0;
  pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
  fwdLen = pSyncHead->len + sizeof(SSyncHead);  // include the WAL and SYNC head

S
Shengliang Guan 已提交
1331
  pthread_mutex_lock(&pNode->mutex);
1332

S
Shengliang Guan 已提交
1333
  for (int32_t i = 0; i < pNode->replica; ++i) {
1334 1335 1336 1337 1338 1339 1340 1341 1342
    pPeer = pNode->peerInfo[i];
    if (pPeer == NULL || pPeer->peerFd < 0) continue;
    if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;

    if (pNode->quorum > 1 && code == 0) {
      syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
      code = 1;
    }

S
TD-2153  
Shengliang Guan 已提交
1343
    int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
1344
    if (retLen == fwdLen) {
S
Shengliang Guan 已提交
1345
      sTrace("%s, forward is sent, role:%s sstatus:%s hver:%" PRIu64 " contLen:%d", pPeer->id, syncRole[pPeer->role],
S
TD-2211  
Shengliang Guan 已提交
1346
             syncStatus[pPeer->sstatus], pWalHead->version, pWalHead->len);
1347
    } else {
S
Shengliang Guan 已提交
1348
      sError("%s, failed to forward, role:%s sstatus:%s hver:%" PRIu64 " retLen:%d", pPeer->id, syncRole[pPeer->role],
S
TD-2211  
Shengliang Guan 已提交
1349
             syncStatus[pPeer->sstatus], pWalHead->version, retLen);
1350 1351 1352 1353
      syncRestartConnection(pPeer);
    }
  }

S
TD-2157  
Shengliang Guan 已提交
1354
  pthread_mutex_unlock(&pNode->mutex);
1355 1356

  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1357
}
1358