syncSnapshot.c 35.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21
#include "syncRaftLog.h"
M
Minghao Li 已提交
22
#include "syncRaftStore.h"
M
Minghao Li 已提交
23
#include "syncReplication.h"
M
Minghao Li 已提交
24
#include "syncUtil.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
S
Shengliang Guan 已提交
29
  if (!condition) return NULL;
M
Minghao Li 已提交
30

S
Shengliang Guan 已提交
31 32 33 34
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
  if (pSender == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
35
  }
36

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
  pSender->start = false;
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->pSyncNode = pSyncNode;
  pSender->replicaIndex = replicaIndex;
  pSender->term = pSyncNode->pRaftStore->currentTerm;
  pSender->startTime = 0;
  pSender->endTime = 0;
  pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
  pSender->finish = false;

  sDebug("vgId:%d, snapshot sender create", pSender->pSyncNode->vgId);
M
Minghao Li 已提交
53 54
  return pSender;
}
M
Minghao Li 已提交
55

M
Minghao Li 已提交
56
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
S
Shengliang Guan 已提交
57 58
  if (pSender == NULL) return;
  sDebug("vgId:%d, snapshot sender destroy", pSender->pSyncNode->vgId);
59

S
Shengliang Guan 已提交
60 61 62 63 64
  // free current block
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
    pSender->pCurrentBlock = NULL;
  }
65

S
Shengliang Guan 已提交
66 67 68 69
  // close reader
  if (pSender->pReader != NULL) {
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    pSender->pReader = NULL;
M
Minghao Li 已提交
70
  }
S
Shengliang Guan 已提交
71 72 73

  // free sender
  taosMemoryFree(pSender);
M
Minghao Li 已提交
74
}
M
Minghao Li 已提交
75

M
Minghao Li 已提交
76 77
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
78
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
S
Shengliang Guan 已提交
79 80 81 82 83
  if (snapshotSenderIsStart(pSender)) {
    sSError(pSender, "vgId:%d, snapshot sender is already start");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
M
Minghao Li 已提交
84

85 86 87
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
88 89 90 91 92 93 94 95 96 97
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.data = NULL;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
98

M
Minghao Li 已提交
99 100 101 102
  memset(&(pSender->lastConfig), 0, sizeof(pSender->lastConfig));
  pSender->sendingMS = 0;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSender->startTime = taosGetTimestampMs();
103
  pSender->lastSendTime = pSender->startTime;
M
Minghao Li 已提交
104
  pSender->finish = false;
M
Minghao Li 已提交
105

M
Minghao Li 已提交
106
  // build begin msg
107
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
108 109 110 111
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
112 113

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
114 115 116
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
117
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
118 119
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
120 121
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
122 123
  pMsg->startTime = pSender->startTime;
  pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
M
Minghao Li 已提交
124

125 126 127 128
  // event log
  sSDebug(pSender, "snapshot sender start");
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");

M
Minghao Li 已提交
129
  // send msg
S
Shengliang Guan 已提交
130 131 132 133
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }
134

135
  return 0;
M
Minghao Li 已提交
136 137
}

138
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
S
Shengliang Guan 已提交
139 140
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);

M
Minghao Li 已提交
141 142 143
  // update flag
  pSender->start = false;
  pSender->finish = finish;
M
Minghao Li 已提交
144
  pSender->endTime = taosGetTimestampMs();
M
Minghao Li 已提交
145

146
  // close reader
M
Minghao Li 已提交
147
  if (pSender->pReader != NULL) {
148
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
M
Minghao Li 已提交
149 150
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
151

152
  // free current block
M
Minghao Li 已提交
153 154
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
155
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
156 157
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
158

159
  return 0;
M
Minghao Li 已提交
160 161
}

162
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
163
// seq = ack + 1, already updated
M
Minghao Li 已提交
164
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
165
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
166 167
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
168
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
169 170 171 172 173
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
S
Shengliang Guan 已提交
174 175 176 177 178 179
                                                           &pSender->pCurrentBlock, &pSender->blockLen);
  if (ret != 0) {
    sSError(pSender, "snapshot sender read failed since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
180
  if (pSender->blockLen > 0) {
S
Shengliang Guan 已提交
181
    sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
182 183
    // has read data
  } else {
184
    // read finish, update seq to end
M
Minghao Li 已提交
185
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
S
Shengliang Guan 已提交
186
    sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
187
  }
M
Minghao Li 已提交
188

M
Minghao Li 已提交
189
  // build msg
190
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
191 192 193 194
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
    return -1;
  }
195 196

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
197 198 199
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
200
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
201 202
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
203 204
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
205
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
206 207
  // pMsg->privateTerm = pSender->privateTerm;

M
Minghao Li 已提交
208 209 210
  if (pSender->pCurrentBlock != NULL) {
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
  }
M
Minghao Li 已提交
211

212
  // event log
S
Shengliang Guan 已提交
213
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
214 215
    sSDebug(pSender, "snapshot sender finish, seq:%d", pSender->seq);
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
S
Shengliang Guan 已提交
216
  } else {
S
Shengliang Guan 已提交
217 218
    sSDebug(pSender, "snapshot sender sending, seq:%d", pSender->seq);
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
S
Shengliang Guan 已提交
219
  }
220 221 222 223 224 225 226 227

  // send msg
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }

  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
228 229 230
  return 0;
}

M
Minghao Li 已提交
231 232
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
233 234
  // build msg
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
235 236 237 238
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
239 240 241 242 243 244 245 246 247 248 249 250 251

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->beginIndex = pSender->snapshotParam.start;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
  pMsg->seq = pSender->seq;

  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
M
Minghao Li 已提交
252
    //  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
253
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
254
  }
M
Minghao Li 已提交
255

256 257 258 259
  // event log
  sSDebug(pSender, "snapshot sender resend, seq:%d", pSender->seq);
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");

260
  // send msg
S
Shengliang Guan 已提交
261 262 263 264
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
    return -1;
  }
265

266
  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
267 268 269
  return 0;
}

S
Shengliang Guan 已提交
270 271 272 273 274 275 276
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  if (pMsg->ack != pSender->seq) {
    sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

277
  pSender->ack = pMsg->ack;
S
Shengliang Guan 已提交
278 279 280 281
  pSender->seq++;

  sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
  return 0;
282 283
}

M
Minghao Li 已提交
284 285 286
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
287
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
S
Shengliang Guan 已提交
288
  sNInfo(pSyncNode, "snapshot sender starting ...");
M
Minghao Li 已提交
289

290 291
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
  if (pSender == NULL) {
S
Shengliang Guan 已提交
292
    sNError(pSyncNode, "snapshot sender start error since get failed");
M
Minghao Li 已提交
293
    return -1;
294 295
  }

M
Minghao Li 已提交
296
  if (snapshotSenderIsStart(pSender)) {
S
Shengliang Guan 已提交
297
    sSError(pSender, "snapshot sender already start, ignore");
M
Minghao Li 已提交
298 299 300
    return 0;
  }

S
Shengliang Guan 已提交
301 302
  if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
    sSInfo(pSender, "snapshot sender start too frequently, ignore");
M
Minghao Li 已提交
303
    return 1;
M
Minghao Li 已提交
304 305
  }

306 307 308
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
309
  sSInfo(pSender, "snapshot sender start for peer:%s:%u", host, port);
310

S
Shengliang Guan 已提交
311
  int32_t code = snapshotSenderStart(pSender);
M
Minghao Li 已提交
312
  if (code != 0) {
S
Shengliang Guan 已提交
313
    sSError(pSender, "snapshot sender start error since %s", terrstr());
M
Minghao Li 已提交
314 315
    return -1;
  }
316 317 318

  return 0;
}
319

320
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
321 322
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
S
Shengliang Guan 已提交
323
  if (!condition) return NULL;
324

S
Shengliang Guan 已提交
325 326 327 328
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
  if (pReceiver == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
329
  }
330

S
Shengliang Guan 已提交
331 332 333 334 335 336 337 338 339 340 341 342
  pReceiver->start = false;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
  pReceiver->pWriter = NULL;
  pReceiver->pSyncNode = pSyncNode;
  pReceiver->fromId = fromId;
  pReceiver->term = pSyncNode->pRaftStore->currentTerm;
  pReceiver->snapshot.data = NULL;
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pReceiver->snapshot.lastApplyTerm = 0;
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;

  sDebug("vgId:%d, snapshot receiver create", pSyncNode->vgId);
343 344
  return pReceiver;
}
M
Minghao Li 已提交
345

346
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
347 348
  if (pReceiver == NULL) return;
  sDebug("vgId:%d, snapshot receiver destroy", pReceiver->pSyncNode->vgId);
349

S
Shengliang Guan 已提交
350 351 352 353 354 355 356 357
  // close writer
  if (pReceiver->pWriter != NULL) {
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
    }
    pReceiver->pWriter = NULL;
358
  }
S
Shengliang Guan 已提交
359 360 361

  // free receiver
  taosMemoryFree(pReceiver);
362
}
M
Minghao Li 已提交
363

364 365
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

366
// force stop
367
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
368 369
  sRInfo(pReceiver, "snapshot receiver force stop, writer:%p");

370 371
  // force close, abandon incomplete data
  if (pReceiver->pWriter != NULL) {
M
Minghao Li 已提交
372
    // event log
373
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
S
Shengliang Guan 已提交
374 375 376 377
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr());
    }
378 379 380 381 382 383
    pReceiver->pWriter = NULL;
  }

  pReceiver->start = false;
}

M
Minghao Li 已提交
384
int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
S
Shengliang Guan 已提交
385 386 387 388 389 390 391 392 393 394 395
  if (!snapshotReceiverIsStart(pReceiver)) {
    sRError(pReceiver, "snapshot receiver is not start");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

  if (pReceiver->pWriter != NULL) {
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
M
Minghao Li 已提交
396 397 398 399 400 401 402 403 404 405 406 407

  // update ack
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  // update snapshot
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;

  // start writer
S
Shengliang Guan 已提交
408 409 410 411 412 413
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
                                                                 &pReceiver->pWriter);
  if (ret != 0) {
    sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
414 415

  // event log
S
Shengliang Guan 已提交
416
  sRInfo(pReceiver, "snapshot receiver start write");
M
Minghao Li 已提交
417 418 419 420
  return 0;
}

int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
421
  if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
422
    sRInfo(pReceiver, "snapshot receiver has started");
423 424
    return 0;
  }
M
Minghao Li 已提交
425 426 427 428 429 430 431 432

  pReceiver->start = true;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
  pReceiver->fromId = pPreMsg->srcId;
  pReceiver->startTime = pPreMsg->startTime;

  // event log
S
Shengliang Guan 已提交
433
  sRInfo(pReceiver, "snapshot receiver is start");
434
  return 0;
435
}
M
Minghao Li 已提交
436

437 438
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
439
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
440 441
  sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);

M
Minghao Li 已提交
442
  if (pReceiver->pWriter != NULL) {
443
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
S
Shengliang Guan 已提交
444 445 446 447
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
    }
M
Minghao Li 已提交
448
    pReceiver->pWriter = NULL;
S
Shengliang Guan 已提交
449 450
  } else {
    sRInfo(pReceiver, "snapshot receiver stop, writer is null");
451
  }
M
Minghao Li 已提交
452 453

  pReceiver->start = false;
454
  return 0;
455 456
}

457
// when recv last snapshot block, apply data into snapshot
458
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
S
Shengliang Guan 已提交
459 460 461 462 463
  if (pMsg->seq != SYNC_SNAPSHOT_SEQ_END) {
    sRError(pReceiver, "snapshot receiver seq:%d is invalid", pMsg->seq);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
464

465
  int32_t code = 0;
466
  if (pReceiver->pWriter != NULL) {
467
    // write data
S
Shengliang Guan 已提交
468
    sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
469 470 471
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
472
      if (code != 0) {
S
Shengliang Guan 已提交
473
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
474 475 476 477 478
        return -1;
      }
    }

    // reset wal
S
Shengliang Guan 已提交
479
    sRInfo(pReceiver, "snapshot receiver log restore");
480 481 482
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
S
Shengliang Guan 已提交
483
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
484
      return -1;
485 486
    }

487 488 489 490 491
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
492 493 494 495 496 497
    // maybe update term
    if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
      pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
      raftStorePersist(pReceiver->pSyncNode->pRaftStore);
    }

498
    // stop writer, apply data
S
Shengliang Guan 已提交
499
    sRInfo(pReceiver, "snapshot receiver apply write");
500
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
S
Shengliang Guan 已提交
501
                                                           &pReceiver->snapshot);
502
    if (code != 0) {
S
Shengliang Guan 已提交
503
      sRError(pReceiver, "snapshot receiver apply failed  since %s", terrstr());
504 505
      return -1;
    }
506 507
    pReceiver->pWriter = NULL;

508 509
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
510

511
  } else {
S
Shengliang Guan 已提交
512
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
513
    return -1;
514 515 516
  }

  // event log
S
Shengliang Guan 已提交
517
  sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
518
  return 0;
519 520
}

521 522
// apply data block
// update progress
S
Shengliang Guan 已提交
523 524 525 526 527 528
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  if (pMsg->seq != pReceiver->ack + 1) {
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
529

S
Shengliang Guan 已提交
530 531 532 533 534
  if (pReceiver->pWriter == NULL) {
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
535

S
Shengliang Guan 已提交
536
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
537

S
Shengliang Guan 已提交
538 539 540 541 542 543 544 545
  if (pMsg->dataLen > 0) {
    // apply data block
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                 pMsg->data, pMsg->dataLen);
    if (code != 0) {
      sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
      return -1;
    }
546
  }
S
Shengliang Guan 已提交
547 548 549 550 551 552 553

  // update progress
  pReceiver->ack = pMsg->seq;

  // event log
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
  return 0;
554 555
}

M
Minghao Li 已提交
556 557 558 559 560
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
  SyncIndex snapStart = SYNC_INDEX_INVALID;

  if (syncNodeIsMnode(ths)) {
    snapStart = SYNC_INDEX_BEGIN;
S
Shengliang Guan 已提交
561
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
M
Minghao Li 已提交
562 563 564 565 566 567 568 569
  } else {
    SSyncLogStoreData *pData = ths->pLogStore->data;
    SWal              *pWal = pData->pWal;

    bool    isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
    int64_t walCommitVer = walGetCommittedVer(pWal);

    if (!isEmpty && ths->commitIndex != walCommitVer) {
S
Shengliang Guan 已提交
570 571
      sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer,
              ths->commitIndex);
M
Minghao Li 已提交
572 573 574 575
      snapStart = walCommitVer + 1;
    } else {
      snapStart = ths->commitIndex + 1;
    }
S
Shengliang Guan 已提交
576 577

    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
M
Minghao Li 已提交
578 579 580 581 582 583 584
  }

  return snapStart;
}

static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
S
Shengliang Guan 已提交
585
  int64_t                timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
586 587 588 589

  if (snapshotReceiverIsStart(pReceiver)) {
    // already start
    if (pMsg->startTime > pReceiver->startTime) {
S
Shengliang Guan 已提交
590 591
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
592 593
      goto _START_RECEIVER;
    } else if (pMsg->startTime == pReceiver->startTime) {
S
Shengliang Guan 已提交
594 595
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
596 597 598 599
      goto _SEND_REPLY;

    } else {
      // ignore
S
Shengliang Guan 已提交
600 601
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
602 603 604 605 606
      return 0;
    }

  } else {
    // start new
S
Shengliang Guan 已提交
607
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
M
Minghao Li 已提交
608 609 610 611
    goto _START_RECEIVER;
  }

_START_RECEIVER:
S
Shengliang Guan 已提交
612 613 614
  if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
    sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
            pMsg->startTime);
M
Minghao Li 已提交
615 616 617
    return -1;
  } else {
    // waiting for clock match
M
Minghao Li 已提交
618
    while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
619 620
      sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
             pMsg->startTime);
M
Minghao Li 已提交
621
      taosMsleep(10);
622
      timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
623 624
    }

625
    if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
626
      sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
627 628 629
      snapshotReceiverForceStop(pReceiver);
    }

M
Minghao Li 已提交
630 631 632 633 634 635
    snapshotReceiverStart(pReceiver, pMsg);  // set start-time same with sender
  }

_SEND_REPLY:
    // build msg
    ;  // make complier happy
636 637

  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
638 639 640 641
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }
642 643

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
644 645 646 647 648 649 650 651 652 653 654
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pMsg->seq;  // receiver maybe already closed
  pRspMsg->code = 0;
  pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);

  // send msg
S
Shengliang Guan 已提交
655 656 657 658 659 660
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
661 662 663 664 665 666 667
  return 0;
}

static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 1
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

M
Minghao Li 已提交
668
  if (!snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
669
    sRError(pReceiver, "snapshot receiver not start");
M
Minghao Li 已提交
670
    return -1;
M
Minghao Li 已提交
671 672
  }

M
Minghao Li 已提交
673
  if (pReceiver->startTime != pMsg->startTime) {
S
Shengliang Guan 已提交
674 675
    sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
            pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
676
    return -1;
M
Minghao Li 已提交
677
  }
M
Minghao Li 已提交
678

M
Minghao Li 已提交
679 680
  // start writer
  snapshotReceiverStartWriter(pReceiver, pMsg);
M
Minghao Li 已提交
681

M
Minghao Li 已提交
682
  // build msg
683
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
684 685 686 687
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
688 689

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
690 691 692 693 694 695 696 697 698
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
  pRspMsg->code = 0;
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
M
Minghao Li 已提交
699

M
Minghao Li 已提交
700
  // send msg
S
Shengliang Guan 已提交
701 702 703 704 705 706
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
707 708 709 710 711 712 713 714 715 716 717
  return 0;
}

static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 4
  // transfering
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
718 719
    sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
720
    taosMsleep(10);
S
Shengliang Guan 已提交
721
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
722 723
  }

S
Shengliang Guan 已提交
724 725
  if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
    return -1;
M
Minghao Li 已提交
726 727
  }

M
Minghao Li 已提交
728
  // build msg
729
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
730 731 732 733
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
734 735

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
736 737 738 739 740 741 742 743 744 745 746
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
  pRspMsg->code = 0;
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
S
Shengliang Guan 已提交
747 748 749 750 751
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver receiving");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }
752

M
Minghao Li 已提交
753 754 755
  return 0;
}

M
Minghao Li 已提交
756 757 758 759 760 761 762 763
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 2
  // end, finish FSM
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
764 765
    sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
766
    taosMsleep(10);
M
Minghao Li 已提交
767
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
768 769 770 771 772 773 774 775
  }

  int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
  if (code == 0) {
    snapshotReceiverStop(pReceiver);
  }

  // build msg
776
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
777 778 779 780
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
    return -1;
  }
781 782

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
783 784 785 786 787 788 789 790 791 792 793
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
  pRspMsg->code = 0;
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
S
Shengliang Guan 已提交
794 795 796 797 798 799
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
800 801
  return 0;
}
M
Minghao Li 已提交
802

803 804
// receiver on message
//
M
Minghao Li 已提交
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
//              if receiver already start
//                    if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
//                    if sender.start-time = receiver.start-time, maybe duplicate msg
//                    if sender.start-time < receiver.start-time, ignore
//              else
//                    waiting for clock match
//                    start receiver(reply snapshot start)
//
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
//              a. create writer with <begin, end>
//
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
//
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
//
// condition 5, got data, update ack
M
Minghao Li 已提交
822
//
823
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
824 825
  SyncSnapshotSend      *pMsg = pRpcMsg->pCont;
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
826

M
Minghao Li 已提交
827
  // if already drop replica, do not process
S
Shengliang Guan 已提交
828
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
829 830 831 832 833
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
    return 0;
  }

  if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
S
Shengliang Guan 已提交
834
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
M
Minghao Li 已提交
835 836 837 838 839 840 841 842
    return 0;
  }

  if (pMsg->term > pSyncNode->pRaftStore->currentTerm) {
    syncNodeStepDown(pSyncNode, pMsg->term);
  }
  syncNodeResetElectTimer(pSyncNode);

843 844
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
845
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
846
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
S
Shengliang Guan 已提交
847
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
M
Minghao Li 已提交
848 849
        syncNodeOnSnapshotPre(pSyncNode, pMsg);
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
S
Shengliang Guan 已提交
850
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
M
Minghao Li 已提交
851
        syncNodeOnSnapshotBegin(pSyncNode, pMsg);
M
Minghao Li 已提交
852
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
853
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
M
Minghao Li 已提交
854
        syncNodeOnSnapshotEnd(pSyncNode, pMsg);
S
Shengliang Guan 已提交
855 856 857 858
        if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
          sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
          return -1;
        }
M
Minghao Li 已提交
859
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
M
Minghao Li 已提交
860
        // force close, no response
S
Shengliang Guan 已提交
861
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
862
        snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
863
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
864
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq");
M
Minghao Li 已提交
865
        syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
M
Minghao Li 已提交
866
      } else {
M
Minghao Li 已提交
867
        // error log
S
Shengliang Guan 已提交
868
        sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
M
Minghao Li 已提交
869
        return -1;
870
      }
871 872
    } else {
      // error log
S
Shengliang Guan 已提交
873
      sRError(pReceiver, "snapshot receiver term not equal");
874
      return -1;
M
Minghao Li 已提交
875
    }
M
Minghao Li 已提交
876
  } else {
877
    // error log
S
Shengliang Guan 已提交
878
    sRError(pReceiver, "snapshot receiver not follower");
879
    return -1;
M
Minghao Li 已提交
880
  }
M
Minghao Li 已提交
881

M
Minghao Li 已提交
882 883 884
  return 0;
}

M
Minghao Li 已提交
885 886 887
int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
  // get sender
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
S
Shengliang Guan 已提交
888 889 890 891 892
  if (pSender == NULL) {
    sNError(pSyncNode, "prepare snapshot error since sender is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
M
Minghao Li 已提交
893

S
Shengliang Guan 已提交
894
  SSnapshot snapshot = {0};
M
Minghao Li 已提交
895 896 897 898 899 900
  pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);

  // prepare <begin, end>
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
  pSender->snapshotParam.end = snapshot.lastApplyIndex;

S
Shengliang Guan 已提交
901 902
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
M
Minghao Li 已提交
903

M
Minghao Li 已提交
904
  if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
S
Shengliang Guan 已提交
905 906 907
    sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
            snapshot.lastApplyIndex);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
908 909 910
    return -1;
  }

M
Minghao Li 已提交
911 912 913
  // update sender
  pSender->snapshot = snapshot;

M
Minghao Li 已提交
914 915 916
  // start reader
  int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader));
  if (code != 0) {
S
Shengliang Guan 已提交
917
    sSError(pSender, "prepare snapshot failed since %s", terrstr());
M
Minghao Li 已提交
918 919 920
    return -1;
  }

M
Minghao Li 已提交
921
  // update next index
922
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
M
Minghao Li 已提交
923

M
Minghao Li 已提交
924 925 926
  // update seq
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

M
Minghao Li 已提交
927
  // build begin msg
928
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
929 930 931 932
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "prepare snapshot failed since build msg error");
    return -1;
  }
933 934

  SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
M
Minghao Li 已提交
935 936 937 938 939 940 941 942 943 944 945 946
  pSendMsg->srcId = pSender->pSyncNode->myRaftId;
  pSendMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSendMsg->beginIndex = pSender->snapshotParam.start;
  pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pSendMsg->lastConfig = pSender->lastConfig;
  pSendMsg->startTime = pSender->startTime;
  pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

  // send msg
S
Shengliang Guan 已提交
947 948 949 950 951
  syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
  if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "prepare snapshot failed since send msg error");
    return -1;
  }
M
Minghao Li 已提交
952 953 954 955

  return 0;
}

956 957 958 959 960 961
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
S
Shengliang Guan 已提交
962
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
963 964
  SyncSnapshotRsp *pMsg = pRpcMsg->pCont;

965
  // if already drop replica, do not process
M
Minghao Li 已提交
966 967
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
968
    return -1;
969 970
  }

M
Minghao Li 已提交
971
  // get sender
S
Shengliang Guan 已提交
972 973 974 975 976 977
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
  if (pSender == NULL) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
978

M
Minghao Li 已提交
979
  if (pMsg->startTime != pSender->startTime) {
S
Shengliang Guan 已提交
980
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender:% " PRId64 " receiver:%" PRId64 " time not match");
M
Minghao Li 已提交
981 982 983
    return -1;
  }

984 985 986
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
987 988
      // prepare <begin, end>, send begin msg
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
S
Shengliang Guan 已提交
989
        syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
M
Minghao Li 已提交
990 991 992 993
        syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
        return 0;
      }

M
Minghao Li 已提交
994
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
S
Shengliang Guan 已提交
995 996 997 998 999 1000 1001 1002
        syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
        if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
          return -1;
        }

        if (snapshotSend(pSender) != 0) {
          return -1;
        }
M
Minghao Li 已提交
1003 1004 1005
        return 0;
      }

1006
      // receive ack is finish, close sender
M
Minghao Li 已提交
1007
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
1008
        syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
1009
        snapshotSenderStop(pSender, true);
1010
        SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1011
        if (pMgr) {
S
Shengliang Guan 已提交
1012
          syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
1013
          syncLogReplMgrReset(pMgr);
1014
        }
M
Minghao Li 已提交
1015 1016 1017 1018
        return 0;
      }

      // send next msg
1019
      if (pMsg->ack == pSender->seq) {
S
Shengliang Guan 已提交
1020
        syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq");
M
Minghao Li 已提交
1021
        // update sender ack
S
Shengliang Guan 已提交
1022 1023 1024 1025 1026 1027
        if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
          return -1;
        }
        if (snapshotSend(pSender) != 0) {
          return -1;
        }
1028

M
Minghao Li 已提交
1029
      } else if (pMsg->ack == pSender->seq - 1) {
1030
        // maybe resend
S
Shengliang Guan 已提交
1031
        syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
M
Minghao Li 已提交
1032
        snapshotReSend(pSender);
1033

M
Minghao Li 已提交
1034
      } else {
1035
        // error log
S
Shengliang Guan 已提交
1036
        sSError(pSender, "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
1037
        return -1;
1038
      }
1039 1040
    } else {
      // error log
S
Shengliang Guan 已提交
1041
      sSError(pSender, "snapshot sender term not equal");
1042
      return -1;
1043
    }
M
Minghao Li 已提交
1044
  } else {
1045
    // error log
S
Shengliang Guan 已提交
1046
    sSError(pSender, "snapshot sender not leader");
1047
    return -1;
1048 1049 1050
  }

  return 0;
1051
}