syncSnapshot.c 31.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
//----------------------------------
25 26 27 28
static void    snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
static void    snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
static void    snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
M
Minghao Li 已提交
29

30
//----------------------------------
M
Minghao Li 已提交
31
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
32 33
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
34

M
Minghao Li 已提交
35 36 37
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
38
    if (pSender == NULL) {
39 40
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
41
    }
M
Minghao Li 已提交
42 43
    memset(pSender, 0, sizeof(*pSender));

44 45
    int64_t timeNow = taosGetTimestampMs();

M
Minghao Li 已提交
46 47 48 49 50 51 52 53 54 55
    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
56 57
    pSender->privateTerm = timeNow + 100;
    pSender->startTime = timeNow;
58
    pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
59
    pSender->finish = false;
M
Minghao Li 已提交
60
  } else {
61
    sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
M
Minghao Li 已提交
62
  }
63

M
Minghao Li 已提交
64 65
  return pSender;
}
M
Minghao Li 已提交
66

M
Minghao Li 已提交
67 68
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
69
    // free current block
M
Minghao Li 已提交
70 71
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
72
      pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
73
    }
74 75 76 77

    // close reader
    if (pSender->pReader != NULL) {
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
78 79 80
      if (ret != 0) {
        syncNodeErrorLog(pSender->pSyncNode, "stop reader error");
      }
81 82 83 84
      pSender->pReader = NULL;
    }

    // free sender
M
Minghao Li 已提交
85 86 87
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
88

M
Minghao Li 已提交
89 90
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

91 92 93 94 95 96
// begin send snapshot by param, snapshot, pReader
//
// action:
// 1. assert reader not start
// 2. update state
// 3. send first snapshot block
97 98
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
                            void *pReader) {
M
Minghao Li 已提交
99
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
100

101
  // init snapshot, parm, reader
M
Minghao Li 已提交
102
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
103 104
  pSender->pReader = pReader;
  pSender->snapshot = snapshot;
105
  pSender->snapshotParam = snapshotParam;
M
Minghao Li 已提交
106

107
  // init current block
M
Minghao Li 已提交
108 109 110 111 112
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }
  pSender->blockLen = 0;

113 114
  // update term
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
115
  ++(pSender->privateTerm);  // increase private term
116 117 118 119 120 121 122 123

  // update state
  pSender->finish = false;
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;

  // init last config
124
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
125 126
    int32_t         code = 0;
    SSyncRaftEntry *pEntry = NULL;
127 128
    bool            getLastConfig = false;

129 130
    code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                          pSender->snapshot.lastConfigIndex, &pEntry);
131
    if (code == 0 && pEntry != NULL) {
132 133 134 135 136 137 138 139 140 141 142 143 144
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pEntry, &rpcMsg);

      SSyncCfg lastConfig;
      int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
      ASSERT(ret == 0);
      pSender->lastConfig = lastConfig;
      getLastConfig = true;

      rpcFreeCont(rpcMsg.pCont);
      syncEntryDestory(pEntry);
    } else {
      if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
145
        sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
146 147 148 149 150
        pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
        getLastConfig = true;
      }
    }

151
    // last config not found in wal, update to -1
152
    if (!getLastConfig) {
153 154 155
      SyncIndex oldLastConfigIndex = pSender->snapshot.lastConfigIndex;
      SyncIndex newLastConfigIndex = SYNC_INDEX_INVALID;
      pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
156
      memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
157 158 159 160

      // event log
      do {
        char logBuf[128];
161 162
        snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %" PRId64 " to %" PRId64,
                 oldLastConfigIndex, newLastConfigIndex);
163 164 165 166
        char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
        syncNodeEventLog(pSender->pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
167
    }
168 169

  } else {
170
    // no last config
171 172
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
173

M
Minghao Li 已提交
174
  // build begin msg
M
Minghao Li 已提交
175 176 177 178
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
179
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
180 181
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
182 183
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
184
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
185
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
186

M
Minghao Li 已提交
187
  // send msg
M
Minghao Li 已提交
188 189 190 191
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
  syncSnapshotSendDestroy(pMsg);
192 193 194

  // event log
  do {
195
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
196 197 198
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
199 200

  return 0;
M
Minghao Li 已提交
201 202
}

203
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
204
  // close reader
M
Minghao Li 已提交
205 206 207 208 209
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
210

211
  // free current block
M
Minghao Li 已提交
212 213
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
214
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
215 216
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
217

218
  // update flag
M
Minghao Li 已提交
219
  pSender->start = false;
220 221
  pSender->finish = finish;

222 223
  // do not update term, maybe print

224 225 226 227 228 229
  // event log
  do {
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
230 231

  return 0;
M
Minghao Li 已提交
232 233
}

234
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
235
// seq = ack + 1, already updated
M
Minghao Li 已提交
236
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
237
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
238 239
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
240
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
241 242 243 244 245 246 247
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
248 249 250
  if (pSender->blockLen > 0) {
    // has read data
  } else {
251
    // read finish, update seq to end
M
Minghao Li 已提交
252 253
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
254

M
Minghao Li 已提交
255
  // build msg
M
Minghao Li 已提交
256 257 258 259
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
260
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
261 262
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
263 264
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
265
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
266
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
267 268
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
269
  // send msg
M
Minghao Li 已提交
270 271 272
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
273
  syncSnapshotSendDestroy(pMsg);
M
Minghao Li 已提交
274

275 276 277 278 279 280 281 282
  // event log
  do {
    char *eventLog = NULL;
    if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender finish");
    } else {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender sending");
    }
M
Minghao Li 已提交
283 284
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
285
  } while (0);
M
Minghao Li 已提交
286

M
Minghao Li 已提交
287 288 289
  return 0;
}

M
Minghao Li 已提交
290 291
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
292 293 294
  // send current block data
  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
    // build msg
M
Minghao Li 已提交
295 296 297 298
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
299
    pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
300 301
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
302 303
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
304
    pMsg->seq = pSender->seq;
305
    pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
306 307
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

308
    // send msg
M
Minghao Li 已提交
309 310 311 312
    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
    syncSnapshotSendDestroy(pMsg);
313 314 315 316 317 318 319

    // event log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender resend");
      syncNodeEventLog(pSender->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
M
Minghao Li 已提交
320
  }
321

M
Minghao Li 已提交
322 323 324
  return 0;
}

325 326 327 328 329 330
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  ASSERT(pMsg->ack == pSender->seq);
  pSender->ack = pMsg->ack;
  ++(pSender->seq);
}

M
Minghao Li 已提交
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
S
Shengliang Guan 已提交
358
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
359
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
S
Shengliang Guan 已提交
360
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
361
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
362
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
S
Shengliang Guan 已提交
363
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS);
M
Minghao Li 已提交
364 365 366 367
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
S
Shengliang Guan 已提交
368
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term);
M
Minghao Li 已提交
369
    cJSON_AddStringToObject(pRoot, "term", u64buf);
S
Shengliang Guan 已提交
370
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm);
371
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
372
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
373 374 375 376 377 378 379 380 381
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
M
Minghao Li 已提交
382
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
383 384 385 386
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
387 388
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
  int32_t len = 256;
M
Minghao Li 已提交
389
  char   *s = taosMemoryMalloc(len);
M
Minghao Li 已提交
390 391

  SRaftId  destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
392
  char     host[64];
M
Minghao Li 已提交
393 394 395
  uint16_t port;
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);

M
Minghao Li 已提交
396
  snprintf(s, len,
397 398 399
           "%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
           " seq:%d ack:%d finish:%d pterm:%" PRIu64
           " "
400 401 402 403
           "replica-index:%d %s:%d}",
           event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
           pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
           pSender->finish, pSender->privateTerm, pSender->replicaIndex, host, port);
M
Minghao Li 已提交
404 405 406 407

  return s;
}

408 409 410
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
  // calculate <start, end> index

M
Minghao Li 已提交
411 412
  syncNodeEventLog(pSyncNode, "start snapshot ...");

413 414 415 416 417 418 419 420 421 422 423 424
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
  if (pSender == NULL) {
    // create sender
  } else {
    // if <start, end> is same
    // return 0;
  }

  // send begin msg

  return 0;
}
425

M
Minghao Li 已提交
426
// -------------------------------------
427
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
428 429
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
430

431
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
432 433 434 435
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
436

M
Minghao Li 已提交
437 438 439 440
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
441
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
442
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
443
    pReceiver->privateTerm = 0;
M
Minghao Li 已提交
444
    pReceiver->snapshot.data = NULL;
445
    pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
446
    pReceiver->snapshot.lastApplyTerm = 0;
447
    pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
448

M
Minghao Li 已提交
449
  } else {
450
    sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
M
Minghao Li 已提交
451
  }
452 453 454

  return pReceiver;
}
M
Minghao Li 已提交
455

456 457
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
458 459
    // close writer
    if (pReceiver->pWriter != NULL) {
460 461
      int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                    false, &(pReceiver->snapshot));
462 463 464 465 466
      ASSERT(ret == 0);
      pReceiver->pWriter = NULL;
    }

    // free receiver
467 468 469
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
470

471 472
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

473
// static do start by privateTerm, pBeginMsg
474
// receive first snapshot data
475
// write first block data
476
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
477
  // update state
M
Minghao Li 已提交
478
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
479
  pReceiver->privateTerm = pBeginMsg->privateTerm;
M
Minghao Li 已提交
480
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
M
Minghao Li 已提交
481
  pReceiver->fromId = pBeginMsg->srcId;
482
  pReceiver->start = true;
M
Minghao Li 已提交
483

484
  // update snapshot
M
Minghao Li 已提交
485 486 487
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
488 489
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
M
Minghao Li 已提交
490

491
  // start writer
M
Minghao Li 已提交
492
  ASSERT(pReceiver->pWriter == NULL);
493 494
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
                                                                 &(pReceiver->snapshotParam), &(pReceiver->pWriter));
M
Minghao Li 已提交
495
  ASSERT(ret == 0);
496 497 498 499 500 501 502

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
M
Minghao Li 已提交
503 504
}

505
// force stop
506
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
507 508
  // force close, abandon incomplete data
  if (pReceiver->pWriter != NULL) {
509 510
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &(pReceiver->snapshot));
511 512 513 514 515
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
  }

  pReceiver->start = false;
516 517 518 519 520 521 522

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
523 524
}

M
Minghao Li 已提交
525 526
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
527
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
528
  if (!snapshotReceiverIsStart(pReceiver)) {
529
    // first start
530
    snapshotReceiverDoStart(pReceiver, pBeginMsg);
M
Minghao Li 已提交
531

532
  } else {
M
Minghao Li 已提交
533
    // already start
534
    sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
M
Minghao Li 已提交
535 536

    // force close, abandon incomplete data
537
    snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
538 539

    // start again
540
    snapshotReceiverDoStart(pReceiver, pBeginMsg);
541
  }
542 543

  return 0;
544
}
M
Minghao Li 已提交
545

546 547
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
548
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
M
Minghao Li 已提交
549
  if (pReceiver->pWriter != NULL) {
550 551
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &(pReceiver->snapshot));
M
Minghao Li 已提交
552 553
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
554
  }
M
Minghao Li 已提交
555 556

  pReceiver->start = false;
557

558 559 560 561 562 563 564 565
  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
566 567

  return 0;
568 569
}

570
// when recv last snapshot block, apply data into snapshot
571
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
572 573
  ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);

574
  int32_t code = 0;
575
  if (pReceiver->pWriter != NULL) {
576
    // write data
577 578 579
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
580 581 582 583 584 585 586 587 588 589 590 591
      if (code != 0) {
        syncNodeErrorLog(pReceiver->pSyncNode, "snapshot write error");
        return -1;
      }
    }

    // reset wal
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "wal restore from snapshot error");
      return -1;
592 593
    }

594 595 596 597 598
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
599 600 601 602 603 604
    // maybe update term
    if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
      pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
      raftStorePersist(pReceiver->pSyncNode->pRaftStore);
    }

605
    // stop writer, apply data
606 607
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
                                                           &(pReceiver->snapshot));
608 609
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
610
      // ASSERT(0);
611 612
      return -1;
    }
613 614
    pReceiver->pWriter = NULL;

615 616
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
617

618 619 620
  } else {
    syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
    return -1;
621 622 623 624 625 626 627 628 629 630
  }

  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
631 632

  return 0;
633 634
}

635 636
// apply data block
// update progress
637 638 639 640 641
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  ASSERT(pMsg->seq == pReceiver->ack + 1);

  if (pReceiver->pWriter != NULL) {
    if (pMsg->dataLen > 0) {
642
      // apply data block
643 644 645 646
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                   pMsg->data, pMsg->dataLen);
      ASSERT(code == 0);
    }
647 648

    // update progress
649 650 651 652 653 654 655 656
    pReceiver->ack = pMsg->seq;

    // event log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
      syncNodeEventLog(pReceiver->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
657
  }
658 659 660 661 662
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
663

664 665 666
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
667

668 669 670 671 672
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
673 674

    cJSON *pFromId = cJSON_CreateObject();
S
Shengliang Guan 已提交
675
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr);
676 677 678
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
M
Minghao Li 已提交
679
      cJSON   *pTmp = pFromId;
680 681 682 683 684 685 686 687 688
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

S
Shengliang Guan 已提交
689
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex);
M
Minghao Li 已提交
690 691
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);

S
Shengliang Guan 已提交
692
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm);
M
Minghao Li 已提交
693 694
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);

S
Shengliang Guan 已提交
695
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex);
M
Minghao Li 已提交
696 697
    cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);

S
Shengliang Guan 已提交
698
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term);
699
    cJSON_AddStringToObject(pRoot, "term", u64buf);
700

S
Shengliang Guan 已提交
701
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->privateTerm);
702
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
703 704 705 706 707 708 709 710 711
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
M
Minghao Li 已提交
712
  char  *serialized = cJSON_Print(pJson);
713 714 715
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
716

M
Minghao Li 已提交
717 718
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
  int32_t len = 256;
M
Minghao Li 已提交
719
  char   *s = taosMemoryMalloc(len);
M
Minghao Li 已提交
720 721 722 723 724 725

  SRaftId  fromId = pReceiver->fromId;
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);

726
  snprintf(s, len,
727 728 729
           "%s {%p start:%d ack:%d term:%" PRIu64 " pterm:%" PRIu64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64
           " laindex:%" PRId64 " laterm:%" PRIu64
           " "
S
Shengliang Guan 已提交
730
           "lcindex:%" PRId64 "}",
731 732 733
           event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
           pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex,
           pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
M
Minghao Li 已提交
734 735 736 737

  return s;
}

738 739 740 741 742 743
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm
// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
// condition 4, got data, update ack
M
Minghao Li 已提交
744
//
M
Minghao Li 已提交
745
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
746
  // get receiver
747 748
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
749
  int32_t                code = 0;
M
Minghao Li 已提交
750

751 752
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
753 754
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
755
        // condition 1
756
        // begin, no data
757
        snapshotReceiverStart(pReceiver, pMsg);
M
Minghao Li 已提交
758 759 760
        needRsp = true;

      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
761
        // condition 2
M
Minghao Li 已提交
762
        // end, finish FSM
763 764 765 766
        code = snapshotReceiverFinish(pReceiver, pMsg);
        if (code == 0) {
          snapshotReceiverStop(pReceiver);
        }
767
        needRsp = true;
768

769 770
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
771
          SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
772

773 774 775
          // update new config myIndex
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
M
Minghao Li 已提交
776 777 778

          // do config change
          syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
779 780
        }

M
Minghao Li 已提交
781
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
782
        // condition 3
783 784
        // force close
        snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
785 786 787
        needRsp = false;

      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
788
        // condition 4
M
Minghao Li 已提交
789 790
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
791
          snapshotReceiverGotData(pReceiver, pMsg);
M
Minghao Li 已提交
792 793 794 795
        }
        needRsp = true;

      } else {
M
Minghao Li 已提交
796 797 798 799 800 801 802 803 804 805
        // error log
        do {
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
806
      }
M
Minghao Li 已提交
807

808
      // send ack
M
Minghao Li 已提交
809
      if (needRsp) {
810
        // build msg
M
Minghao Li 已提交
811 812 813 814 815 816
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
817 818 819
        pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
        pRspMsg->code = 0;
        pRspMsg->privateTerm = pReceiver->privateTerm;  // receiver maybe already closed
M
Minghao Li 已提交
820

821
        // send msg
M
Minghao Li 已提交
822 823 824 825 826
        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
        syncSnapshotRspDestroy(pRspMsg);
      }
827

828 829 830 831 832 833 834
    } else {
      // error log
      do {
        char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
835 836

      return -1;
M
Minghao Li 已提交
837
    }
M
Minghao Li 已提交
838
  } else {
839 840 841 842 843 844
    // error log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
845 846

    return -1;
M
Minghao Li 已提交
847
  }
M
Minghao Li 已提交
848

M
Minghao Li 已提交
849 850 851
  return 0;
}

852 853 854 855 856 857
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
M
Minghao Li 已提交
858
int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
859 860
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
861 862
    sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
    return -1;
863 864
  }

M
Minghao Li 已提交
865
  // get sender
866
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
867 868 869 870 871
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
872 873
      // condition 1
      // receive ack is finish, close sender
M
Minghao Li 已提交
874
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
875
        snapshotSenderStop(pSender, true);
M
Minghao Li 已提交
876 877 878
        return 0;
      }

879
      // condition 2
M
Minghao Li 已提交
880
      // send next msg
881
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
882
        // update sender ack
883
        snapshotSenderUpdateProgress(pSender, pMsg);
M
Minghao Li 已提交
884
        snapshotSend(pSender);
885

M
Minghao Li 已提交
886
      } else if (pMsg->ack == pSender->seq - 1) {
887
        // maybe resend
M
Minghao Li 已提交
888
        snapshotReSend(pSender);
889

M
Minghao Li 已提交
890
      } else {
891
        // error log
892
        do {
M
Minghao Li 已提交
893 894
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
895 896 897 898 899 900
          char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
901
      }
902 903 904 905 906 907 908 909 910
    } else {
      // error log
      do {
        char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);

      return -1;
911
    }
M
Minghao Li 已提交
912
  } else {
913 914 915 916 917 918 919 920
    // error log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);

    return -1;
921 922 923
  }

  return 0;
924
}