syncSnapshot.c 34.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
M
Minghao Li 已提交
25

26
//----------------------------------
M
Minghao Li 已提交
27
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
28 29
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
30

M
Minghao Li 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
47
    pSender->privateTerm = taosGetTimestampMs() + 100;
M
Minghao Li 已提交
48
    pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
49
    pSender->finish = false;
M
Minghao Li 已提交
50
  } else {
M
Minghao Li 已提交
51
    sError("snapshotSenderCreate cannot create sender");
M
Minghao Li 已提交
52
  }
M
Minghao Li 已提交
53 54
  return pSender;
}
M
Minghao Li 已提交
55

M
Minghao Li 已提交
56 57
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
M
Minghao Li 已提交
58 59 60
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
    }
M
Minghao Li 已提交
61 62 63
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
64

M
Minghao Li 已提交
65 66
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
67
// begin send snapshot (current term, seq begin)
M
Minghao Li 已提交
68 69
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71 72
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
  // open snapshot reader
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
76 77 78
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
  ASSERT(ret == 0);

M
Minghao Li 已提交
79 80 81 82 83 84
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }

  pSender->blockLen = 0;

M
Minghao Li 已提交
85
  // get current snapshot info
M
Minghao Li 已提交
86
  pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
87
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
88
    /*
89
    SSyncRaftEntry *pEntry = NULL;
90 91
    int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                                  pSender->snapshot.lastConfigIndex, &pEntry);
92
    ASSERT(code == 0);
93 94 95 96 97 98
    ASSERT(pEntry != NULL);
    */

    SSyncRaftEntry *pEntry =
        pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex);
    ASSERT(pEntry != NULL);
99 100 101 102 103 104 105 106 107 108 109 110 111 112

    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);
    SSyncCfg lastConfig;
    int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
    ASSERT(ret == 0);
    pSender->lastConfig = lastConfig;

    rpcFreeCont(rpcMsg.pCont);
    syncEntryDestory(pEntry);

  } else {
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
113

M
Minghao Li 已提交
114 115 116
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  ++(pSender->privateTerm);
117
  pSender->finish = false;
M
Minghao Li 已提交
118 119
  pSender->start = true;

M
Minghao Li 已提交
120
  // build begin msg
M
Minghao Li 已提交
121 122 123 124 125 126
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
127 128
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
129
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
130
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
131

M
Minghao Li 已提交
132
  // send msg
M
Minghao Li 已提交
133 134 135
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
136

M
Minghao Li 已提交
137 138 139
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
140 141 142

  if (gRaftDetailLog) {
    char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
143
    sDebug(
M
Minghao Li 已提交
144 145
        "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
        "lastApplyIndex:%ld "
146
        "lastApplyTerm:%lu "
M
Minghao Li 已提交
147
        "lastConfigIndex:%ld privateTerm:%lu send "
148
        "msg:%s",
M
Minghao Li 已提交
149
        pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
M
Minghao Li 已提交
150
        pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
151 152
        pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
        pSender->privateTerm, msgStr);
153 154
    taosMemoryFree(msgStr);
  } else {
M
Minghao Li 已提交
155
    sDebug(
M
Minghao Li 已提交
156 157
        "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
        "lastApplyIndex:%ld "
158
        "lastApplyTerm:%lu "
M
Minghao Li 已提交
159
        "lastConfigIndex:%ld privateTerm:%lu",
M
Minghao Li 已提交
160
        pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
M
Minghao Li 已提交
161
        pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
162 163
        pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
        pSender->privateTerm);
164
  }
M
Minghao Li 已提交
165

M
Minghao Li 已提交
166 167 168
  syncSnapshotSendDestroy(pMsg);
}

169
#if 0
M
Minghao Li 已提交
170
// when entry in snapshot, start sender
M
Minghao Li 已提交
171 172
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  if (!(pSender->start)) {
M
Minghao Li 已提交
173
    // start
M
Minghao Li 已提交
174 175 176
    snapshotSenderDoStart(pSender);
    pSender->start = true;
  } else {
M
Minghao Li 已提交
177
    // already start
M
Minghao Li 已提交
178 179
    ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term);

M
Minghao Li 已提交
180
    // if current term is higher, need start again
M
Minghao Li 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193
    if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) {
      // force peer rollback
      SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
      pMsg->srcId = pSender->pSyncNode->myRaftId;
      pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
      pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
      pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
      pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
      pMsg->seq = SYNC_SNAPSHOT_SEQ_FORCE_CLOSE;

      SRpcMsg rpcMsg;
      syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
      syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
194 195

      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
196
      sTrace("snapshot send force close seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
197 198
      taosMemoryFree(msgStr);

M
Minghao Li 已提交
199 200 201 202 203
      syncSnapshotSendDestroy(pMsg);

      // close reader
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
      ASSERT(ret == 0);
M
Minghao Li 已提交
204
      pSender->pReader = NULL;
M
Minghao Li 已提交
205 206 207

      // start again
      snapshotSenderDoStart(pSender);
M
Minghao Li 已提交
208
      pSender->start = true;
M
Minghao Li 已提交
209
    } else {
M
Minghao Li 已提交
210 211
      // current term, do nothing
      ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term);
M
Minghao Li 已提交
212 213
    }
  }
M
Minghao Li 已提交
214 215 216 217

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStart %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
218
}
219
#endif
M
Minghao Li 已提交
220 221

void snapshotSenderStop(SSyncSnapshotSender *pSender) {
M
Minghao Li 已提交
222 223 224 225 226
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
227 228 229

  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
230
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
231 232
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
233 234

  pSender->start = false;
M
Minghao Li 已提交
235

236 237 238 239 240
  if (gRaftDetailLog) {
    char *s = snapshotSender2Str(pSender);
    sInfo("snapshotSenderStop %s", s);
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
241 242
}

M
Minghao Li 已提交
243 244
// when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
M
Minghao Li 已提交
245 246 247 248
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
  // free memory last time (seq - 1)
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
249
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
250 251 252 253 254 255 256
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
257 258 259 260 261 262
  if (pSender->blockLen > 0) {
    // has read data
  } else {
    // read finish
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
263

M
Minghao Li 已提交
264
  // build msg
M
Minghao Li 已提交
265 266 267 268 269 270
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
271 272
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
273
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
274
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
275 276
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
277
  // send msg
M
Minghao Li 已提交
278 279 280
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
281

M
Minghao Li 已提交
282 283 284
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
285

M
Minghao Li 已提交
286
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
287 288
    if (gRaftDetailLog) {
      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
289
      sDebug(
M
Minghao Li 已提交
290 291
          "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
          "lastApplyIndex:%ld "
292
          "lastApplyTerm:%lu "
M
Minghao Li 已提交
293
          "lastConfigIndex:%ld privateTerm:%lu send "
294
          "msg:%s",
M
Minghao Li 已提交
295
          pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
M
Minghao Li 已提交
296
          pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
297 298
          pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
          pSender->privateTerm, msgStr);
299 300
      taosMemoryFree(msgStr);
    } else {
M
Minghao Li 已提交
301
      sDebug(
M
Minghao Li 已提交
302 303
          "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
          "lastApplyIndex:%ld "
304
          "lastApplyTerm:%lu "
M
Minghao Li 已提交
305
          "lastConfigIndex:%ld privateTerm:%lu",
M
Minghao Li 已提交
306
          pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
M
Minghao Li 已提交
307
          pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
308 309
          pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
          pSender->privateTerm);
310
    }
M
Minghao Li 已提交
311
  } else {
M
Minghao Li 已提交
312
    sDebug(
M
Minghao Li 已提交
313 314
        "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
        "lastApplyIndex:%ld "
315
        "lastApplyTerm:%lu "
M
Minghao Li 已提交
316
        "lastConfigIndex:%ld privateTerm:%lu",
M
Minghao Li 已提交
317
        pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
M
Minghao Li 已提交
318
        pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
319 320
        pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
        pSender->privateTerm);
M
Minghao Li 已提交
321 322
  }

M
Minghao Li 已提交
323 324 325 326
  syncSnapshotSendDestroy(pMsg);
  return 0;
}

M
Minghao Li 已提交
327 328 329 330 331 332 333 334 335
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
  if (pSender->pCurrentBlock != NULL) {
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
336 337
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
338 339 340 341 342 343
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
344

M
Minghao Li 已提交
345 346 347
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
348 349 350

    if (gRaftDetailLog) {
      char *msgStr = syncSnapshotSend2Str(pMsg);
351
      sDebug(
M
Minghao Li 已提交
352 353
          "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
          "privateTerm:%lu send "
M
Minghao Li 已提交
354
          "msg:%s",
M
Minghao Li 已提交
355
          pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
M
Minghao Li 已提交
356 357
          pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm,
          msgStr);
358 359
      taosMemoryFree(msgStr);
    } else {
M
Minghao Li 已提交
360 361 362 363 364
      sDebug(
          "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
          "privateTerm:%lu",
          pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
          pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm);
365
    }
M
Minghao Li 已提交
366

M
Minghao Li 已提交
367 368 369 370 371
    syncSnapshotSendDestroy(pMsg);
  }
  return 0;
}

M
Minghao Li 已提交
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
400
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
401
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
402
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
403 404 405 406 407 408 409 410 411
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);

    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
412 413
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
414
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
415 416 417 418 419 420 421 422 423
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
424
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
425 426 427 428 429
  cJSON_Delete(pJson);
  return serialized;
}

// -------------------------------------
430
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
431 432
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
433

434
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
435 436 437 438
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
439

M
Minghao Li 已提交
440 441 442 443
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
444
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
445
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
446 447
    pReceiver->privateTerm = 0;

M
Minghao Li 已提交
448 449 450
  } else {
    sInfo("snapshotReceiverCreate cannot create receiver");
  }
451 452 453

  return pReceiver;
}
M
Minghao Li 已提交
454

455 456 457 458 459
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
460

461 462
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

M
Minghao Li 已提交
463
// begin receive snapshot msg (current term, seq begin)
464
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
M
Minghao Li 已提交
465
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
466
  pReceiver->privateTerm = privateTerm;
M
Minghao Li 已提交
467
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
468
  pReceiver->fromId = fromId;
M
Minghao Li 已提交
469 470 471 472 473 474 475 476

  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
}

// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
477
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
478
  if (!snapshotReceiverIsStart(pReceiver)) {
M
Minghao Li 已提交
479
    // start
480
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
481
    pReceiver->start = true;
M
Minghao Li 已提交
482

483
  } else {
M
Minghao Li 已提交
484
    // already start
485
    sInfo("snapshot recv, receiver already start");
M
Minghao Li 已提交
486 487 488 489 490 491 492 493

    // force close, abandon incomplete data
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;

    // start again
494
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
495
    pReceiver->start = true;
496
  }
M
Minghao Li 已提交
497

498 499 500 501 502
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStart %s", s);
    taosMemoryFree(s);
  }
503
}
M
Minghao Li 已提交
504

505
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
M
Minghao Li 已提交
506 507 508 509 510
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
511
  }
M
Minghao Li 已提交
512 513

  pReceiver->start = false;
514 515

  if (apply) {
516
    //    ++(pReceiver->privateTerm);
517
  }
M
Minghao Li 已提交
518

519 520 521 522 523
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStop %s", s);
    taosMemoryFree(s);
  }
524 525 526 527 528
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
529

530 531 532
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
533

534 535 536 537 538
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
539 540 541 542 543 544

    cJSON *pFromId = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
545
      cJSON   *pTmp = pFromId;
546 547 548 549 550 551 552 553 554
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

555 556
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
557 558 559

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
560 561 562 563 564 565 566 567 568
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
569
  char  *serialized = cJSON_Print(pJson);
570 571 572
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
573

574
// receiver do something
M
Minghao Li 已提交
575
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
576
  // get receiver
577 578
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
579
  int32_t                writeCode = 0;
M
Minghao Li 已提交
580

581 582
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
583 584 585
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        // begin
586
        snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId);
M
Minghao Li 已提交
587 588 589
        pReceiver->ack = pMsg->seq;
        needRsp = true;

M
Minghao Li 已提交
590 591 592
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
593 594 595

        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
596
          sDebug(
M
Minghao Li 已提交
597 598
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
              "lastIndex:%ld, "
M
Minghao Li 已提交
599
              "lastTerm:%lu, "
M
Minghao Li 已提交
600
              "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
M
Minghao Li 已提交
601 602 603
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
              pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
604 605
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
606
          sDebug(
M
Minghao Li 已提交
607 608
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
              "lastIndex:%ld, "
M
Minghao Li 已提交
609
              "lastTerm:%lu, "
M
Minghao Li 已提交
610
              "lastConfigIndex:%ld privateTerm:%lu",
M
Minghao Li 已提交
611 612 613
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
              pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex, pReceiver->privateTerm);
614
        }
M
Minghao Li 已提交
615

M
Minghao Li 已提交
616 617
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
618 619 620
        writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
        ASSERT(writeCode == 0);

M
Minghao Li 已提交
621
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
M
Minghao Li 已提交
622
        pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
623

624 625
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
626 627
          // int32_t  oldReplicaNum = pSyncNode->replicaNum;
          SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
628

629 630 631 632 633 634
          // update new config myIndex
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
          bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg);

#if 0
635 636 637 638 639 640 641 642 643 644 645
          // update new config myIndex
          bool     IamInNew = false;
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
            if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
                pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
              newSyncCfg.myIndex = i;
              IamInNew = true;
              break;
            }
          }
646
#endif
647

648
          bool isDrop;
649
          if (IamInNew) {
650
            sDebug(
M
Minghao Li 已提交
651 652
                "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
                "lastTerm:%lu, "
653
                "lastConfigIndex:%ld ",
M
Minghao Li 已提交
654 655
                pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
                pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
656 657
            syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
          } else {
M
Minghao Li 已提交
658
            sDebug(
M
Minghao Li 已提交
659 660
                "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
                "newCfg, "
661
                "lastIndex:%ld, lastTerm:%lu, "
662
                "lastConfigIndex:%ld ",
M
Minghao Li 已提交
663 664
                pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
                pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
665 666 667 668
          }

          // change isStandBy to normal
          if (!isDrop) {
M
Minghao Li 已提交
669 670 671 672 673 674 675 676 677 678
            char  tmpbuf[512];
            char *oldStr = syncCfg2Str(&oldSyncCfg);
            char *newStr = syncCfg2Str(&newSyncCfg);
            syncUtilJson2Line(oldStr);
            syncUtilJson2Line(newStr);
            snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s  -->  %s", oldSyncCfg.replicaNum,
                     newSyncCfg.replicaNum, oldStr, newStr);
            taosMemoryFree(oldStr);
            taosMemoryFree(newStr);

679
            if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
680
              syncNodeBecomeLeader(pSyncNode, tmpbuf);
681
            } else {
682
              syncNodeBecomeFollower(pSyncNode, tmpbuf);
683
            }
684
          }
685 686
        }

M
Minghao Li 已提交
687 688
        SSnapshot snapshot;
        pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
689

M
Minghao Li 已提交
690 691 692
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
693 694 695

        if (gRaftDetailLog) {
          char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
M
Minghao Li 已提交
696
          sDebug(
M
Minghao Li 已提交
697 698
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
              "index:%ld, "
699
              "snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
700
              "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
M
Minghao Li 已提交
701 702 703
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
              pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
              snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr);
704 705
          taosMemoryFree(logSimpleStr);
        } else {
M
Minghao Li 已提交
706
          sDebug(
M
Minghao Li 已提交
707 708
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
              "index:%ld, "
709
              "snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
710
              "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
M
Minghao Li 已提交
711 712 713
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
              pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
              snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm);
714
        }
M
Minghao Li 已提交
715

M
Minghao Li 已提交
716
        pReceiver->pWriter = NULL;
717
        snapshotReceiverStop(pReceiver, true);
M
Minghao Li 已提交
718
        pReceiver->ack = pMsg->seq;
M
Minghao Li 已提交
719
        needRsp = true;
M
Minghao Li 已提交
720

721 722
        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
723
          sDebug(
M
Minghao Li 已提交
724 725
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
              "lastIndex:%ld, lastTerm:%lu, "
M
Minghao Li 已提交
726
              "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
M
Minghao Li 已提交
727
              pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
M
Minghao Li 已提交
728 729
              pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
              pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
730 731
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
732
          sDebug(
M
Minghao Li 已提交
733 734
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
              "lastIndex:%ld, lastTerm:%lu, "
M
Minghao Li 已提交
735
              "lastConfigIndex:%ld, privateTerm:%lu",
M
Minghao Li 已提交
736
              pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
M
Minghao Li 已提交
737 738
              pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
              pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
739
        }
M
Minghao Li 已提交
740

M
Minghao Li 已提交
741 742
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
743
        snapshotReceiverStop(pReceiver, false);
M
Minghao Li 已提交
744 745
        needRsp = false;

M
Minghao Li 已提交
746 747 748 749
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

750 751
        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
752
          sDebug(
M
Minghao Li 已提交
753 754
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
              "lastIndex:%ld, "
755
              "lastTerm:%lu, "
M
Minghao Li 已提交
756
              "lastConfigIndex:%ld, privateTerm:%lu, recv "
757
              "msg:%s",
M
Minghao Li 已提交
758
              pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
M
Minghao Li 已提交
759 760
              pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
              pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
761 762
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
763
          sDebug(
M
Minghao Li 已提交
764 765
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
              "lastIndex:%ld, "
766
              "lastTerm:%lu, "
M
Minghao Li 已提交
767
              "lastConfigIndex:%ld, privateTerm:%lu",
M
Minghao Li 已提交
768
              pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
M
Minghao Li 已提交
769 770
              pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
              pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
771
        }
M
Minghao Li 已提交
772

M
Minghao Li 已提交
773 774 775
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
776 777 778
          writeCode =
              pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
          ASSERT(writeCode == 0);
M
Minghao Li 已提交
779 780 781 782
          pReceiver->ack = pMsg->seq;
        }
        needRsp = true;

M
Minghao Li 已提交
783 784 785
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
786 787 788

        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
789
          sDebug(
M
Minghao Li 已提交
790 791
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
              "lastIndex:%ld, "
792
              "lastTerm:%lu, "
M
Minghao Li 已提交
793
              "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
M
Minghao Li 已提交
794 795 796
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
              pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
797 798
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
799
          sDebug(
M
Minghao Li 已提交
800 801
              "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
              "lastIndex:%ld, "
802
              "lastTerm:%lu, "
M
Minghao Li 已提交
803
              "lastConfigIndex:%ld, privateTerm:%lu",
M
Minghao Li 已提交
804 805 806
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
              pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex, pReceiver->privateTerm);
807
        }
M
Minghao Li 已提交
808

M
Minghao Li 已提交
809 810
      } else {
        ASSERT(0);
811
      }
M
Minghao Li 已提交
812

M
Minghao Li 已提交
813 814 815 816 817 818 819 820
      if (needRsp) {
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
        pRspMsg->ack = pReceiver->ack;
821
        pRspMsg->code = writeCode;
M
Minghao Li 已提交
822
        pRspMsg->privateTerm = pReceiver->privateTerm;
M
Minghao Li 已提交
823 824 825 826

        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
M
Minghao Li 已提交
827

M
Minghao Li 已提交
828 829
        syncSnapshotRspDestroy(pRspMsg);
      }
M
Minghao Li 已提交
830
    }
M
Minghao Li 已提交
831 832
  } else {
    syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
M
Minghao Li 已提交
833
  }
M
Minghao Li 已提交
834

M
Minghao Li 已提交
835 836 837
  return 0;
}

M
Minghao Li 已提交
838 839
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
840
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
841 842 843 844 845 846
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    sInfo("recv SyncSnapshotRsp maybe replica already dropped");
    return 0;
  }

M
Minghao Li 已提交
847
  // get sender
848
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
849 850 851 852 853
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
854 855
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
856
        pSender->finish = true;
M
Minghao Li 已提交
857 858 859 860 861
        snapshotSenderStop(pSender);
        return 0;
      }

      // send next msg
862
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
863
        // update sender ack
864 865
        pSender->ack = pMsg->ack;
        (pSender->seq)++;
M
Minghao Li 已提交
866
        snapshotSend(pSender);
867

M
Minghao Li 已提交
868 869
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
870

M
Minghao Li 已提交
871 872
      } else {
        ASSERT(0);
873 874
      }
    }
M
Minghao Li 已提交
875 876
  } else {
    syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
877 878 879
  }

  return 0;
880
}