syncSnapshot.c 35.9 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21
#include "syncRaftLog.h"
M
Minghao Li 已提交
22
#include "syncRaftStore.h"
M
Minghao Li 已提交
23
#include "syncReplication.h"
M
Minghao Li 已提交
24
#include "syncUtil.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
S
Shengliang Guan 已提交
29
  if (!condition) return NULL;
M
Minghao Li 已提交
30

S
Shengliang Guan 已提交
31 32 33 34
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
  if (pSender == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
35
  }
36

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
  pSender->start = false;
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->pSyncNode = pSyncNode;
  pSender->replicaIndex = replicaIndex;
  pSender->term = pSyncNode->pRaftStore->currentTerm;
  pSender->startTime = 0;
  pSender->endTime = 0;
  pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
  pSender->finish = false;

M
Minghao Li 已提交
52 53
  return pSender;
}
M
Minghao Li 已提交
54

M
Minghao Li 已提交
55
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
S
Shengliang Guan 已提交
56
  if (pSender == NULL) return;
57

S
Shengliang Guan 已提交
58 59 60 61 62
  // free current block
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
    pSender->pCurrentBlock = NULL;
  }
63

S
Shengliang Guan 已提交
64 65 66 67
  // close reader
  if (pSender->pReader != NULL) {
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    pSender->pReader = NULL;
M
Minghao Li 已提交
68
  }
S
Shengliang Guan 已提交
69 70 71

  // free sender
  taosMemoryFree(pSender);
M
Minghao Li 已提交
72
}
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
76
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
77 78 79
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
80 81 82 83 84 85 86 87 88 89
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.data = NULL;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
90

91
  memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
M
Minghao Li 已提交
92 93 94
  pSender->sendingMS = 0;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSender->startTime = taosGetTimestampMs();
95
  pSender->lastSendTime = pSender->startTime;
M
Minghao Li 已提交
96
  pSender->finish = false;
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98
  // build begin msg
99
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
100 101 102 103
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
104 105

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
106
  pMsg->srcId = pSender->pSyncNode->myRaftId;
107
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
M
Minghao Li 已提交
108
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
109
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
110 111
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
112 113
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
114 115
  pMsg->startTime = pSender->startTime;
  pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
M
Minghao Li 已提交
116

117 118 119
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");

M
Minghao Li 已提交
120
  // send msg
S
Shengliang Guan 已提交
121 122 123 124
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }
125

126
  return 0;
M
Minghao Li 已提交
127 128
}

129
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
S
Shengliang Guan 已提交
130 131
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);

M
Minghao Li 已提交
132 133 134
  // update flag
  pSender->start = false;
  pSender->finish = finish;
M
Minghao Li 已提交
135
  pSender->endTime = taosGetTimestampMs();
M
Minghao Li 已提交
136

137
  // close reader
M
Minghao Li 已提交
138
  if (pSender->pReader != NULL) {
139
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
M
Minghao Li 已提交
140 141
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
142

143
  // free current block
M
Minghao Li 已提交
144 145
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
146
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
147 148 149 150
    pSender->blockLen = 0;
  }
}

151
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
152
// seq = ack + 1, already updated
M
Minghao Li 已提交
153
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
154
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
155 156
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
157
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
158 159 160 161 162
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
S
Shengliang Guan 已提交
163 164 165 166 167 168
                                                           &pSender->pCurrentBlock, &pSender->blockLen);
  if (ret != 0) {
    sSError(pSender, "snapshot sender read failed since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
169 170
  if (pSender->blockLen > 0) {
    // has read data
171
    sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
172
  } else {
173
    // read finish, update seq to end
M
Minghao Li 已提交
174
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
S
Shengliang Guan 已提交
175
    sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
176
  }
M
Minghao Li 已提交
177

M
Minghao Li 已提交
178
  // build msg
179
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
180 181 182 183
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
    return -1;
  }
184 185

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
186
  pMsg->srcId = pSender->pSyncNode->myRaftId;
187
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
M
Minghao Li 已提交
188
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
189
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
190 191
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
192 193
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
194
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
195

M
Minghao Li 已提交
196 197 198
  if (pSender->pCurrentBlock != NULL) {
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
  }
M
Minghao Li 已提交
199

200
  // event log
S
Shengliang Guan 已提交
201
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
202
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
S
Shengliang Guan 已提交
203
  } else {
S
Shengliang Guan 已提交
204
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
S
Shengliang Guan 已提交
205
  }
206 207 208 209 210 211 212 213

  // send msg
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }

  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
214 215 216
  return 0;
}

M
Minghao Li 已提交
217 218
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
219 220
  // build msg
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
221 222 223 224
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
225 226 227

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
  pMsg->srcId = pSender->pSyncNode->myRaftId;
228
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
229 230 231 232 233 234 235 236 237
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->beginIndex = pSender->snapshotParam.start;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
  pMsg->seq = pSender->seq;

  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
M
Minghao Li 已提交
238
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
239
  }
M
Minghao Li 已提交
240

241 242 243
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");

244
  // send msg
S
Shengliang Guan 已提交
245 246 247 248
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
    return -1;
  }
249

250
  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
251 252 253
  return 0;
}

S
Shengliang Guan 已提交
254 255 256 257 258 259 260
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  if (pMsg->ack != pSender->seq) {
    sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

261
  pSender->ack = pMsg->ack;
S
Shengliang Guan 已提交
262 263 264 265
  pSender->seq++;

  sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
  return 0;
266 267
}

M
Minghao Li 已提交
268 269 270
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
271
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
S
Shengliang Guan 已提交
272
  sNInfo(pSyncNode, "snapshot sender starting ...");
M
Minghao Li 已提交
273

274 275
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
  if (pSender == NULL) {
S
Shengliang Guan 已提交
276
    sNError(pSyncNode, "snapshot sender start error since get failed");
M
Minghao Li 已提交
277
    return -1;
278 279
  }

M
Minghao Li 已提交
280
  if (snapshotSenderIsStart(pSender)) {
281
    sSInfo(pSender, "snapshot sender already start, ignore");
M
Minghao Li 已提交
282 283 284
    return 0;
  }

S
Shengliang Guan 已提交
285 286
  if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
    sSInfo(pSender, "snapshot sender start too frequently, ignore");
287
    return 0;
M
Minghao Li 已提交
288 289
  }

290
  sSInfo(pSender, "snapshot sender start");
291

S
Shengliang Guan 已提交
292
  int32_t code = snapshotSenderStart(pSender);
M
Minghao Li 已提交
293
  if (code != 0) {
S
Shengliang Guan 已提交
294
    sSError(pSender, "snapshot sender start error since %s", terrstr());
M
Minghao Li 已提交
295 296
    return -1;
  }
297 298 299

  return 0;
}
300

301
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
302 303
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
S
Shengliang Guan 已提交
304
  if (!condition) return NULL;
305

S
Shengliang Guan 已提交
306 307 308 309
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
  if (pReceiver == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
310
  }
311

S
Shengliang Guan 已提交
312 313 314 315 316 317 318 319 320 321 322
  pReceiver->start = false;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
  pReceiver->pWriter = NULL;
  pReceiver->pSyncNode = pSyncNode;
  pReceiver->fromId = fromId;
  pReceiver->term = pSyncNode->pRaftStore->currentTerm;
  pReceiver->snapshot.data = NULL;
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pReceiver->snapshot.lastApplyTerm = 0;
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;

323 324
  return pReceiver;
}
M
Minghao Li 已提交
325

326
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
327
  if (pReceiver == NULL) return;
328

S
Shengliang Guan 已提交
329 330 331 332 333 334 335 336
  // close writer
  if (pReceiver->pWriter != NULL) {
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
    }
    pReceiver->pWriter = NULL;
337
  }
S
Shengliang Guan 已提交
338 339 340

  // free receiver
  taosMemoryFree(pReceiver);
341
}
M
Minghao Li 已提交
342

343 344
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

345
// force stop
346
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
347 348
  sRInfo(pReceiver, "snapshot receiver force stop, writer:%p");

349 350
  // force close, abandon incomplete data
  if (pReceiver->pWriter != NULL) {
351
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
S
Shengliang Guan 已提交
352 353 354 355
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr());
    }
356 357 358 359 360 361
    pReceiver->pWriter = NULL;
  }

  pReceiver->start = false;
}

362
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
S
Shengliang Guan 已提交
363 364 365 366 367
  if (pReceiver->pWriter != NULL) {
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
M
Minghao Li 已提交
368 369 370 371 372 373 374 375 376 377 378 379

  // update ack
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  // update snapshot
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;

  // start writer
S
Shengliang Guan 已提交
380 381 382 383 384 385
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
                                                                 &pReceiver->pWriter);
  if (ret != 0) {
    sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
386 387

  // event log
S
Shengliang Guan 已提交
388
  sRInfo(pReceiver, "snapshot receiver start write");
M
Minghao Li 已提交
389 390 391
  return 0;
}

392
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
393
  if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
394
    sRInfo(pReceiver, "snapshot receiver has started");
395
    return;
396
  }
M
Minghao Li 已提交
397 398 399 400 401 402 403 404

  pReceiver->start = true;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
  pReceiver->fromId = pPreMsg->srcId;
  pReceiver->startTime = pPreMsg->startTime;

  // event log
S
Shengliang Guan 已提交
405
  sRInfo(pReceiver, "snapshot receiver is start");
406
}
M
Minghao Li 已提交
407

408 409
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
410
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
411 412
  sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);

M
Minghao Li 已提交
413
  if (pReceiver->pWriter != NULL) {
414
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
S
Shengliang Guan 已提交
415 416 417 418
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
    }
M
Minghao Li 已提交
419
    pReceiver->pWriter = NULL;
S
Shengliang Guan 已提交
420 421
  } else {
    sRInfo(pReceiver, "snapshot receiver stop, writer is null");
422
  }
M
Minghao Li 已提交
423 424

  pReceiver->start = false;
425 426
}

427
// when recv last snapshot block, apply data into snapshot
428 429
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  int32_t code = 0;
430
  if (pReceiver->pWriter != NULL) {
431
    // write data
S
Shengliang Guan 已提交
432
    sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
433 434 435
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
436
      if (code != 0) {
S
Shengliang Guan 已提交
437
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
438 439 440 441 442
        return -1;
      }
    }

    // reset wal
S
Shengliang Guan 已提交
443
    sRInfo(pReceiver, "snapshot receiver log restore");
444 445 446
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
S
Shengliang Guan 已提交
447
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
448
      return -1;
449 450
    }

451 452 453 454 455
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
456 457 458 459 460 461
    // maybe update term
    if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
      pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
      raftStorePersist(pReceiver->pSyncNode->pRaftStore);
    }

462
    // stop writer, apply data
S
Shengliang Guan 已提交
463
    sRInfo(pReceiver, "snapshot receiver apply write");
464
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
S
Shengliang Guan 已提交
465
                                                           &pReceiver->snapshot);
466
    if (code != 0) {
S
Shengliang Guan 已提交
467
      sRError(pReceiver, "snapshot receiver apply failed  since %s", terrstr());
468 469
      return -1;
    }
470 471
    pReceiver->pWriter = NULL;

472 473
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
474

475
  } else {
S
Shengliang Guan 已提交
476
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
477
    return -1;
478 479 480
  }

  // event log
S
Shengliang Guan 已提交
481
  sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
482
  return 0;
483 484
}

485 486
// apply data block
// update progress
S
Shengliang Guan 已提交
487 488 489
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  if (pMsg->seq != pReceiver->ack + 1) {
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
490
    terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
S
Shengliang Guan 已提交
491 492
    return -1;
  }
493

S
Shengliang Guan 已提交
494 495 496 497 498
  if (pReceiver->pWriter == NULL) {
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
499

S
Shengliang Guan 已提交
500
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
501

S
Shengliang Guan 已提交
502 503 504 505 506 507 508 509
  if (pMsg->dataLen > 0) {
    // apply data block
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                 pMsg->data, pMsg->dataLen);
    if (code != 0) {
      sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
      return -1;
    }
510
  }
S
Shengliang Guan 已提交
511 512 513 514 515 516 517

  // update progress
  pReceiver->ack = pMsg->seq;

  // event log
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
  return 0;
518 519
}

M
Minghao Li 已提交
520 521 522 523 524
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
  SyncIndex snapStart = SYNC_INDEX_INVALID;

  if (syncNodeIsMnode(ths)) {
    snapStart = SYNC_INDEX_BEGIN;
S
Shengliang Guan 已提交
525
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
M
Minghao Li 已提交
526 527 528 529 530 531 532 533
  } else {
    SSyncLogStoreData *pData = ths->pLogStore->data;
    SWal              *pWal = pData->pWal;

    bool    isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
    int64_t walCommitVer = walGetCommittedVer(pWal);

    if (!isEmpty && ths->commitIndex != walCommitVer) {
S
Shengliang Guan 已提交
534 535
      sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer,
              ths->commitIndex);
M
Minghao Li 已提交
536 537 538 539
      snapStart = walCommitVer + 1;
    } else {
      snapStart = ths->commitIndex + 1;
    }
S
Shengliang Guan 已提交
540 541

    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
M
Minghao Li 已提交
542 543 544 545 546 547 548
  }

  return snapStart;
}

static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
S
Shengliang Guan 已提交
549
  int64_t                timeNow = taosGetTimestampMs();
550
  int32_t                code = 0;
M
Minghao Li 已提交
551 552 553 554

  if (snapshotReceiverIsStart(pReceiver)) {
    // already start
    if (pMsg->startTime > pReceiver->startTime) {
S
Shengliang Guan 已提交
555 556
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
557 558
      goto _START_RECEIVER;
    } else if (pMsg->startTime == pReceiver->startTime) {
S
Shengliang Guan 已提交
559 560
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
561 562 563
      goto _SEND_REPLY;
    } else {
      // ignore
564 565 566 567 568
      sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
              pReceiver->startTime, pMsg->startTime);
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
      code = terrno;
      goto _SEND_REPLY;
M
Minghao Li 已提交
569 570 571
    }
  } else {
    // start new
S
Shengliang Guan 已提交
572
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
M
Minghao Li 已提交
573 574 575 576
    goto _START_RECEIVER;
  }

_START_RECEIVER:
S
Shengliang Guan 已提交
577 578 579
  if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
    sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
            pMsg->startTime);
580 581
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    code = terrno;
M
Minghao Li 已提交
582 583
  } else {
    // waiting for clock match
M
Minghao Li 已提交
584
    while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
585 586
      sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
             pMsg->startTime);
M
Minghao Li 已提交
587
      taosMsleep(10);
588
      timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
589 590
    }

591
    if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
592
      sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
593 594 595
      snapshotReceiverForceStop(pReceiver);
    }

M
Minghao Li 已提交
596 597 598 599 600 601
    snapshotReceiverStart(pReceiver, pMsg);  // set start-time same with sender
  }

_SEND_REPLY:
    // build msg
    ;  // make complier happy
602 603

  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
604 605 606 607
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }
608 609

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
610 611 612 613 614 615 616
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pMsg->seq;  // receiver maybe already closed
617
  pRspMsg->code = code;
M
Minghao Li 已提交
618 619 620
  pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);

  // send msg
S
Shengliang Guan 已提交
621 622 623 624 625 626
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }

627
  return code;
M
Minghao Li 已提交
628 629 630 631 632
}

static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 1
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
633
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
634

M
Minghao Li 已提交
635
  if (!snapshotReceiverIsStart(pReceiver)) {
636 637
    sRError(pReceiver, "snapshot receiver begin failed since not start");
    goto _SEND_REPLY;
M
Minghao Li 已提交
638 639
  }

M
Minghao Li 已提交
640
  if (pReceiver->startTime != pMsg->startTime) {
641
    sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
S
Shengliang Guan 已提交
642
            pReceiver->startTime, pMsg->startTime);
643
    goto _SEND_REPLY;
M
Minghao Li 已提交
644
  }
M
Minghao Li 已提交
645

M
Minghao Li 已提交
646
  // start writer
647 648 649 650 651 652 653 654 655 656
  if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) {
    sRError(pReceiver, "snapshot receiver begin failed since start writer failed");
    goto _SEND_REPLY;
  }

  code = 0;
_SEND_REPLY:
  if (code != 0 && terrno != 0) {
    code = terrno;
  }
M
Minghao Li 已提交
657

M
Minghao Li 已提交
658
  // build msg
659
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
660 661 662 663
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
664 665

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
666 667 668 669 670 671 672
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
673
  pRspMsg->code = code;
M
Minghao Li 已提交
674
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
M
Minghao Li 已提交
675

M
Minghao Li 已提交
676
  // send msg
S
Shengliang Guan 已提交
677 678 679 680 681 682
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }

683
  return code;
M
Minghao Li 已提交
684 685
}

686
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
687 688 689 690 691 692 693
  // condition 4
  // transfering
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
694 695
    sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
696
    taosMsleep(10);
S
Shengliang Guan 已提交
697
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
698 699
  }

700
  int32_t code = 0;
S
Shengliang Guan 已提交
701
  if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
702 703 704 705
    code = terrno;
    if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
    }
M
Minghao Li 已提交
706 707
  }

M
Minghao Li 已提交
708
  // build msg
709
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
710 711 712 713
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
714 715

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
716 717 718 719 720 721 722
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
723
  pRspMsg->code = code;
M
Minghao Li 已提交
724 725 726
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
727
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received");
S
Shengliang Guan 已提交
728 729 730 731
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }
732

733
  return code;
M
Minghao Li 已提交
734 735
}

M
Minghao Li 已提交
736 737 738 739 740 741 742 743
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 2
  // end, finish FSM
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
744 745
    sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
746
    taosMsleep(10);
M
Minghao Li 已提交
747
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
748 749 750 751 752 753 754 755
  }

  int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
  if (code == 0) {
    snapshotReceiverStop(pReceiver);
  }

  // build msg
756
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
757 758 759 760
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
    return -1;
  }
761 762

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
763 764 765 766 767 768 769
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
770
  pRspMsg->code = code;
M
Minghao Li 已提交
771 772 773
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
S
Shengliang Guan 已提交
774 775 776 777 778 779
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
    return -1;
  }

780
  return code;
M
Minghao Li 已提交
781
}
M
Minghao Li 已提交
782

783 784
// receiver on message
//
M
Minghao Li 已提交
785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
//              if receiver already start
//                    if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
//                    if sender.start-time = receiver.start-time, maybe duplicate msg
//                    if sender.start-time < receiver.start-time, ignore
//              else
//                    waiting for clock match
//                    start receiver(reply snapshot start)
//
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
//              a. create writer with <begin, end>
//
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
//
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
//
// condition 5, got data, update ack
M
Minghao Li 已提交
802
//
803
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
804 805
  SyncSnapshotSend      *pMsg = pRpcMsg->pCont;
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
806

M
Minghao Li 已提交
807
  // if already drop replica, do not process
S
Shengliang Guan 已提交
808
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
809
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
810 811
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
812 813 814
  }

  if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
S
Shengliang Guan 已提交
815
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
816 817
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
818 819 820 821 822 823 824
  }

  if (pMsg->term > pSyncNode->pRaftStore->currentTerm) {
    syncNodeStepDown(pSyncNode, pMsg->term);
  }
  syncNodeResetElectTimer(pSyncNode);

825
  // state, term, seq/ack
826
  int32_t code = 0;
827
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
828
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
829
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
S
Shengliang Guan 已提交
830
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
831
        code = syncNodeOnSnapshotPre(pSyncNode, pMsg);
M
Minghao Li 已提交
832
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
S
Shengliang Guan 已提交
833
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
834
        code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
M
Minghao Li 已提交
835
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
836
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
837
        code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
S
Shengliang Guan 已提交
838 839
        if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
          sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
840
          code = -1;
S
Shengliang Guan 已提交
841
        }
M
Minghao Li 已提交
842
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
M
Minghao Li 已提交
843
        // force close, no response
S
Shengliang Guan 已提交
844
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
845
        snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
846
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
847
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
848
        code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
M
Minghao Li 已提交
849
      } else {
M
Minghao Li 已提交
850
        // error log
S
Shengliang Guan 已提交
851
        sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
852
        code = -1;
853
      }
854 855
    } else {
      // error log
S
Shengliang Guan 已提交
856
      sRError(pReceiver, "snapshot receiver term not equal");
857
      code = -1;
M
Minghao Li 已提交
858
    }
M
Minghao Li 已提交
859
  } else {
860
    // error log
S
Shengliang Guan 已提交
861
    sRError(pReceiver, "snapshot receiver not follower");
862
    code = -1;
M
Minghao Li 已提交
863
  }
M
Minghao Li 已提交
864

865
  return code;
M
Minghao Li 已提交
866 867
}

868
static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
S
Shengliang Guan 已提交
869
  SSnapshot snapshot = {0};
M
Minghao Li 已提交
870 871 872 873 874 875
  pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);

  // prepare <begin, end>
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
  pSender->snapshotParam.end = snapshot.lastApplyIndex;

S
Shengliang Guan 已提交
876 877
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
M
Minghao Li 已提交
878

M
Minghao Li 已提交
879
  if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
S
Shengliang Guan 已提交
880 881 882
    sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
            snapshot.lastApplyIndex);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
883 884 885
    return -1;
  }

M
Minghao Li 已提交
886 887 888
  // update sender
  pSender->snapshot = snapshot;

M
Minghao Li 已提交
889
  // start reader
890
  int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
M
Minghao Li 已提交
891
  if (code != 0) {
S
Shengliang Guan 已提交
892
    sSError(pSender, "prepare snapshot failed since %s", terrstr());
M
Minghao Li 已提交
893 894 895
    return -1;
  }

M
Minghao Li 已提交
896
  // update next index
897
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
M
Minghao Li 已提交
898

M
Minghao Li 已提交
899 900 901
  // update seq
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

M
Minghao Li 已提交
902
  // build begin msg
903
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
904 905 906 907
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "prepare snapshot failed since build msg error");
    return -1;
  }
908 909

  SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
M
Minghao Li 已提交
910
  pSendMsg->srcId = pSender->pSyncNode->myRaftId;
911
  pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
M
Minghao Li 已提交
912 913 914 915 916 917 918 919 920 921
  pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSendMsg->beginIndex = pSender->snapshotParam.start;
  pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pSendMsg->lastConfig = pSender->lastConfig;
  pSendMsg->startTime = pSender->startTime;
  pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

  // send msg
S
Shengliang Guan 已提交
922 923 924 925 926
  syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
  if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "prepare snapshot failed since send msg error");
    return -1;
  }
M
Minghao Li 已提交
927 928 929 930

  return 0;
}

931 932 933 934 935 936
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
S
Shengliang Guan 已提交
937
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
938 939
  SyncSnapshotRsp *pMsg = pRpcMsg->pCont;

940
  // if already drop replica, do not process
941
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
942
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
943
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
944
    return -1;
945 946
  }

M
Minghao Li 已提交
947
  // get sender
S
Shengliang Guan 已提交
948 949 950 951 952 953
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
  if (pSender == NULL) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
954 955

  // state, term, seq/ack
956
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
957
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
958
    sSError(pSender, "snapshot sender not leader");
959
    terrno = TSDB_CODE_SYN_NOT_LEADER;
960 961 962 963 964 965 966
    goto _ERROR;
  }

  if (pMsg->startTime != pSender->startTime) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
    sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime,
            pSender->startTime, pMsg->code);
967
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
968
    goto _ERROR;
969
  }
M
Minghao Li 已提交
970

971
  if (pMsg->term != pSyncNode->pRaftStore->currentTerm) {
972 973 974
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
    sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
            pSyncNode->pRaftStore->currentTerm);
975
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
976
    goto _ERROR;
977
  }
S
Shengliang Guan 已提交
978

979 980 981
  if (pMsg->code != 0) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
    sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code);
982
    terrno = pMsg->code;
983
    goto _ERROR;
984
  }
M
Minghao Li 已提交
985

986 987 988
  // prepare <begin, end>, send begin msg
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
989
    return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg);
990
  }
991

992 993 994 995 996
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
      return -1;
    }
997

998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
    return 0;
  }

  // receive ack is finish, close sender
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1009
    syncLogReplMgrReset(pMgr);
1010 1011 1012 1013 1014 1015 1016 1017
    return 0;
  }

  // send next msg
  if (pMsg->ack == pSender->seq) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
    // update sender ack
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
1018
      return -1;
1019
    }
1020 1021 1022 1023 1024 1025
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
  } else if (pMsg->ack == pSender->seq - 1) {
    // maybe resend
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
1026 1027 1028
    if (snapshotReSend(pSender) != 0) {
      return -1;
    }
M
Minghao Li 已提交
1029
  } else {
1030
    // error log
1031 1032 1033 1034
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
    sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1035
    syncLogReplMgrReset(pMgr);
1036
    return -1;
1037 1038 1039
  }

  return 0;
1040 1041 1042 1043

_ERROR:
  snapshotSenderStop(pSender, true);
  SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1044
  syncLogReplMgrReset(pMgr);
1045 1046

  return -1;
1047
}