syncSnapshot.c 35.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21
#include "syncRaftLog.h"
M
Minghao Li 已提交
22
#include "syncRaftStore.h"
M
Minghao Li 已提交
23
#include "syncReplication.h"
M
Minghao Li 已提交
24
#include "syncUtil.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
S
Shengliang Guan 已提交
29
  if (!condition) return NULL;
M
Minghao Li 已提交
30

S
Shengliang Guan 已提交
31 32 33 34
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
  if (pSender == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
35
  }
36

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45
  pSender->start = false;
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->pSyncNode = pSyncNode;
  pSender->replicaIndex = replicaIndex;
46
  pSender->term = raftStoreGetTerm(pSyncNode);
S
Shengliang Guan 已提交
47 48 49 50 51
  pSender->startTime = 0;
  pSender->endTime = 0;
  pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
  pSender->finish = false;

M
Minghao Li 已提交
52 53
  return pSender;
}
M
Minghao Li 已提交
54

M
Minghao Li 已提交
55
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
S
Shengliang Guan 已提交
56
  if (pSender == NULL) return;
57

S
Shengliang Guan 已提交
58 59 60 61 62
  // free current block
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
    pSender->pCurrentBlock = NULL;
  }
63

S
Shengliang Guan 已提交
64 65 66 67
  // close reader
  if (pSender->pReader != NULL) {
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    pSender->pReader = NULL;
M
Minghao Li 已提交
68
  }
S
Shengliang Guan 已提交
69 70 71

  // free sender
  taosMemoryFree(pSender);
M
Minghao Li 已提交
72
}
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
76
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
77 78 79
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
80 81 82 83 84 85 86 87 88 89
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.data = NULL;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
90

91
  memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
M
Minghao Li 已提交
92
  pSender->sendingMS = 0;
93
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
M
Minghao Li 已提交
94
  pSender->startTime = taosGetTimestampMs();
95
  pSender->lastSendTime = pSender->startTime;
M
Minghao Li 已提交
96
  pSender->finish = false;
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98
  // build begin msg
99
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
100 101 102 103
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
104 105

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
106
  pMsg->srcId = pSender->pSyncNode->myRaftId;
107
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
108
  pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
109
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
110 111
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
112 113
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
114
  pMsg->startTime = pSender->startTime;
115
  pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
M
Minghao Li 已提交
116

117 118 119
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");

M
Minghao Li 已提交
120
  // send msg
S
Shengliang Guan 已提交
121 122 123 124
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }
125

126
  return 0;
M
Minghao Li 已提交
127 128
}

129
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
S
Shengliang Guan 已提交
130 131
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);

M
Minghao Li 已提交
132 133 134
  // update flag
  pSender->start = false;
  pSender->finish = finish;
M
Minghao Li 已提交
135
  pSender->endTime = taosGetTimestampMs();
M
Minghao Li 已提交
136

137
  // close reader
M
Minghao Li 已提交
138
  if (pSender->pReader != NULL) {
139
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
M
Minghao Li 已提交
140 141
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
142

143
  // free current block
M
Minghao Li 已提交
144 145
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
146
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
147 148 149 150
    pSender->blockLen = 0;
  }
}

151
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
152
// seq = ack + 1, already updated
153
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
154
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
155 156
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
157
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
158 159 160 161 162
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
S
Shengliang Guan 已提交
163 164 165 166 167 168
                                                           &pSender->pCurrentBlock, &pSender->blockLen);
  if (ret != 0) {
    sSError(pSender, "snapshot sender read failed since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
169 170
  if (pSender->blockLen > 0) {
    // has read data
171
    sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
172
  } else {
173
    // read finish, update seq to end
M
Minghao Li 已提交
174
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
S
Shengliang Guan 已提交
175
    sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
176
  }
M
Minghao Li 已提交
177

M
Minghao Li 已提交
178
  // build msg
179
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
180 181 182 183
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
    return -1;
  }
184 185

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
186
  pMsg->srcId = pSender->pSyncNode->myRaftId;
187
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
188
  pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
189
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
190 191
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
192 193
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
194
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
195

M
Minghao Li 已提交
196 197 198
  if (pSender->pCurrentBlock != NULL) {
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
  }
M
Minghao Li 已提交
199

200
  // event log
S
Shengliang Guan 已提交
201
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
202
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
S
Shengliang Guan 已提交
203
  } else {
S
Shengliang Guan 已提交
204
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
S
Shengliang Guan 已提交
205
  }
206 207 208 209 210 211 212 213

  // send msg
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }

  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
214 215 216
  return 0;
}

M
Minghao Li 已提交
217 218
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
219 220
  // build msg
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
221 222 223 224
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
225 226 227

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
  pMsg->srcId = pSender->pSyncNode->myRaftId;
228
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
229
  pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
230 231 232 233 234 235 236 237
  pMsg->beginIndex = pSender->snapshotParam.start;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
  pMsg->seq = pSender->seq;

  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
M
Minghao Li 已提交
238
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
239
  }
M
Minghao Li 已提交
240

241 242 243
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");

244
  // send msg
S
Shengliang Guan 已提交
245 246 247 248
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
    return -1;
  }
249

250
  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
251 252 253
  return 0;
}

S
Shengliang Guan 已提交
254 255 256 257 258 259 260
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  if (pMsg->ack != pSender->seq) {
    sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

261
  pSender->ack = pMsg->ack;
S
Shengliang Guan 已提交
262 263 264 265
  pSender->seq++;

  sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
  return 0;
266 267
}

M
Minghao Li 已提交
268 269 270
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
271
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
S
Shengliang Guan 已提交
272
  sNInfo(pSyncNode, "snapshot sender starting ...");
M
Minghao Li 已提交
273

274 275
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
  if (pSender == NULL) {
S
Shengliang Guan 已提交
276
    sNError(pSyncNode, "snapshot sender start error since get failed");
M
Minghao Li 已提交
277
    return -1;
278 279
  }

M
Minghao Li 已提交
280
  if (snapshotSenderIsStart(pSender)) {
281
    sSInfo(pSender, "snapshot sender already start, ignore");
M
Minghao Li 已提交
282 283 284
    return 0;
  }

S
Shengliang Guan 已提交
285 286
  if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
    sSInfo(pSender, "snapshot sender start too frequently, ignore");
287
    return 0;
M
Minghao Li 已提交
288 289
  }

290
  sSInfo(pSender, "snapshot sender start");
291

S
Shengliang Guan 已提交
292
  int32_t code = snapshotSenderStart(pSender);
M
Minghao Li 已提交
293
  if (code != 0) {
S
Shengliang Guan 已提交
294
    sSError(pSender, "snapshot sender start error since %s", terrstr());
M
Minghao Li 已提交
295 296
    return -1;
  }
297 298 299

  return 0;
}
300

301
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
302 303
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
S
Shengliang Guan 已提交
304
  if (!condition) return NULL;
305

S
Shengliang Guan 已提交
306 307 308 309
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
  if (pReceiver == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
310
  }
311

S
Shengliang Guan 已提交
312 313 314 315 316
  pReceiver->start = false;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
  pReceiver->pWriter = NULL;
  pReceiver->pSyncNode = pSyncNode;
  pReceiver->fromId = fromId;
317
  pReceiver->term = raftStoreGetTerm(pSyncNode);
S
Shengliang Guan 已提交
318 319 320 321 322
  pReceiver->snapshot.data = NULL;
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pReceiver->snapshot.lastApplyTerm = 0;
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;

323 324
  return pReceiver;
}
M
Minghao Li 已提交
325

326
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
327
  if (pReceiver == NULL) return;
328

S
Shengliang Guan 已提交
329 330 331 332 333 334 335 336
  // close writer
  if (pReceiver->pWriter != NULL) {
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
    }
    pReceiver->pWriter = NULL;
337
  }
S
Shengliang Guan 已提交
338 339 340

  // free receiver
  taosMemoryFree(pReceiver);
341
}
M
Minghao Li 已提交
342

343 344
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

345
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
S
Shengliang Guan 已提交
346 347 348 349 350
  if (pReceiver->pWriter != NULL) {
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
M
Minghao Li 已提交
351 352 353 354 355 356 357 358 359 360 361 362

  // update ack
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  // update snapshot
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;

  // start writer
S
Shengliang Guan 已提交
363 364 365 366 367 368
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
                                                                 &pReceiver->pWriter);
  if (ret != 0) {
    sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
369 370

  // event log
S
Shengliang Guan 已提交
371
  sRInfo(pReceiver, "snapshot receiver start write");
M
Minghao Li 已提交
372 373 374
  return 0;
}

375
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
376
  if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
377
    sRInfo(pReceiver, "snapshot receiver has started");
378
    return;
379
  }
M
Minghao Li 已提交
380 381

  pReceiver->start = true;
382
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
383
  pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode);
M
Minghao Li 已提交
384 385 386 387
  pReceiver->fromId = pPreMsg->srcId;
  pReceiver->startTime = pPreMsg->startTime;

  // event log
S
Shengliang Guan 已提交
388
  sRInfo(pReceiver, "snapshot receiver is start");
389
}
M
Minghao Li 已提交
390

391
// just set start = false
S
Shengliang Guan 已提交
392
// FpSnapshotStopWrite should not be called
393
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
394 395
  sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);

M
Minghao Li 已提交
396
  if (pReceiver->pWriter != NULL) {
397
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
S
Shengliang Guan 已提交
398 399 400 401
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
    }
M
Minghao Li 已提交
402
    pReceiver->pWriter = NULL;
S
Shengliang Guan 已提交
403 404
  } else {
    sRInfo(pReceiver, "snapshot receiver stop, writer is null");
405
  }
M
Minghao Li 已提交
406 407

  pReceiver->start = false;
408 409
}

410
// when recv last snapshot block, apply data into snapshot
411 412
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  int32_t code = 0;
413
  if (pReceiver->pWriter != NULL) {
414
    // write data
S
Shengliang Guan 已提交
415
    sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
416 417 418
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
419
      if (code != 0) {
S
Shengliang Guan 已提交
420
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
421 422 423 424 425
        return -1;
      }
    }

    // reset wal
S
Shengliang Guan 已提交
426
    sRInfo(pReceiver, "snapshot receiver log restore");
427 428 429
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
S
Shengliang Guan 已提交
430
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
431
      return -1;
432 433
    }

434 435 436 437 438
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
439
    // maybe update term
440 441
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
M
Minghao Li 已提交
442 443
    }

444
    // stop writer, apply data
S
Shengliang Guan 已提交
445
    sRInfo(pReceiver, "snapshot receiver apply write");
446
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
S
Shengliang Guan 已提交
447
                                                           &pReceiver->snapshot);
448
    if (code != 0) {
S
Shengliang Guan 已提交
449
      sRError(pReceiver, "snapshot receiver apply failed  since %s", terrstr());
450 451
      return -1;
    }
452 453
    pReceiver->pWriter = NULL;

454 455
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
456

457
  } else {
S
Shengliang Guan 已提交
458
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
459
    return -1;
460 461 462
  }

  // event log
S
Shengliang Guan 已提交
463
  sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
464
  return 0;
465 466
}

467 468
// apply data block
// update progress
S
Shengliang Guan 已提交
469 470 471
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  if (pMsg->seq != pReceiver->ack + 1) {
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
472
    terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
S
Shengliang Guan 已提交
473 474
    return -1;
  }
475

S
Shengliang Guan 已提交
476 477 478 479 480
  if (pReceiver->pWriter == NULL) {
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
481

S
Shengliang Guan 已提交
482
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
483

S
Shengliang Guan 已提交
484 485 486 487 488 489 490 491
  if (pMsg->dataLen > 0) {
    // apply data block
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                 pMsg->data, pMsg->dataLen);
    if (code != 0) {
      sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
      return -1;
    }
492
  }
S
Shengliang Guan 已提交
493 494 495 496 497 498 499

  // update progress
  pReceiver->ack = pMsg->seq;

  // event log
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
  return 0;
500 501
}

M
Minghao Li 已提交
502 503 504 505 506
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
  SyncIndex snapStart = SYNC_INDEX_INVALID;

  if (syncNodeIsMnode(ths)) {
    snapStart = SYNC_INDEX_BEGIN;
S
Shengliang Guan 已提交
507
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
M
Minghao Li 已提交
508 509 510 511 512
  } else {
    SSyncLogStoreData *pData = ths->pLogStore->data;
    SWal              *pWal = pData->pWal;

    int64_t walCommitVer = walGetCommittedVer(pWal);
513
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
S
Shengliang Guan 已提交
514 515

    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
M
Minghao Li 已提交
516 517 518 519 520
  }

  return snapStart;
}

521
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
522
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
S
Shengliang Guan 已提交
523
  int64_t                timeNow = taosGetTimestampMs();
524
  int32_t                code = 0;
M
Minghao Li 已提交
525 526 527 528

  if (snapshotReceiverIsStart(pReceiver)) {
    // already start
    if (pMsg->startTime > pReceiver->startTime) {
S
Shengliang Guan 已提交
529 530
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
531 532
      goto _START_RECEIVER;
    } else if (pMsg->startTime == pReceiver->startTime) {
S
Shengliang Guan 已提交
533 534
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
535 536 537
      goto _SEND_REPLY;
    } else {
      // ignore
538 539 540 541 542
      sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
              pReceiver->startTime, pMsg->startTime);
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
      code = terrno;
      goto _SEND_REPLY;
M
Minghao Li 已提交
543 544 545
    }
  } else {
    // start new
S
Shengliang Guan 已提交
546
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
M
Minghao Li 已提交
547 548 549 550
    goto _START_RECEIVER;
  }

_START_RECEIVER:
S
Shengliang Guan 已提交
551 552 553
  if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
    sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
            pMsg->startTime);
554 555
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    code = terrno;
M
Minghao Li 已提交
556 557
  } else {
    // waiting for clock match
M
Minghao Li 已提交
558
    while (timeNow < pMsg->startTime) {
559
      sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", startTime:%" PRId64, timeNow,
S
Shengliang Guan 已提交
560
             pMsg->startTime);
M
Minghao Li 已提交
561
      taosMsleep(10);
562
      timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
563 564
    }

565
    if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
566
      sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
567
      snapshotReceiverStop(pReceiver);
568 569
    }

M
Minghao Li 已提交
570 571 572 573 574 575
    snapshotReceiverStart(pReceiver, pMsg);  // set start-time same with sender
  }

_SEND_REPLY:
    // build msg
    ;  // make complier happy
576 577

  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
578 579 580 581
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }
582 583

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
584 585
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
586
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
587 588 589 590
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pMsg->seq;  // receiver maybe already closed
591
  pRspMsg->code = code;
M
Minghao Li 已提交
592 593 594
  pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);

  // send msg
S
Shengliang Guan 已提交
595 596 597 598 599 600
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }

601
  return code;
M
Minghao Li 已提交
602 603 604 605 606
}

static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 1
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
607
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
608

M
Minghao Li 已提交
609
  if (!snapshotReceiverIsStart(pReceiver)) {
610 611
    sRError(pReceiver, "snapshot receiver begin failed since not start");
    goto _SEND_REPLY;
M
Minghao Li 已提交
612 613
  }

M
Minghao Li 已提交
614
  if (pReceiver->startTime != pMsg->startTime) {
615
    sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
S
Shengliang Guan 已提交
616
            pReceiver->startTime, pMsg->startTime);
617
    goto _SEND_REPLY;
M
Minghao Li 已提交
618
  }
M
Minghao Li 已提交
619

M
Minghao Li 已提交
620
  // start writer
621 622 623 624 625 626 627 628 629 630
  if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) {
    sRError(pReceiver, "snapshot receiver begin failed since start writer failed");
    goto _SEND_REPLY;
  }

  code = 0;
_SEND_REPLY:
  if (code != 0 && terrno != 0) {
    code = terrno;
  }
M
Minghao Li 已提交
631

M
Minghao Li 已提交
632
  // build msg
633
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
634 635 636 637
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
638 639

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
640 641
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
642
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
643 644 645 646
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
647
  pRspMsg->code = code;
M
Minghao Li 已提交
648
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
M
Minghao Li 已提交
649

M
Minghao Li 已提交
650
  // send msg
S
Shengliang Guan 已提交
651 652 653 654 655 656
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }

657
  return code;
M
Minghao Li 已提交
658 659
}

660
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
661 662 663 664 665 666 667
  // condition 4
  // transfering
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
668 669
    sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
670
    taosMsleep(10);
S
Shengliang Guan 已提交
671
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
672 673
  }

674
  int32_t code = 0;
S
Shengliang Guan 已提交
675
  if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
676 677 678 679
    code = terrno;
    if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
    }
M
Minghao Li 已提交
680 681
  }

M
Minghao Li 已提交
682
  // build msg
683
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
684 685 686 687
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
688 689

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
690 691
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
692
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
693 694 695 696
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
697
  pRspMsg->code = code;
M
Minghao Li 已提交
698 699 700
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
701
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received");
S
Shengliang Guan 已提交
702 703 704 705
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }
706

707
  return code;
M
Minghao Li 已提交
708 709
}

M
Minghao Li 已提交
710 711 712 713 714 715 716 717
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 2
  // end, finish FSM
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
718 719
    sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
720
    taosMsleep(10);
M
Minghao Li 已提交
721
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
722 723 724 725 726 727 728 729
  }

  int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
  if (code == 0) {
    snapshotReceiverStop(pReceiver);
  }

  // build msg
730
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
731 732 733 734
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
    return -1;
  }
735 736

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
737 738
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
739
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
740 741 742 743
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
744
  pRspMsg->code = code;
M
Minghao Li 已提交
745 746 747
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
S
Shengliang Guan 已提交
748 749 750 751 752 753
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
    return -1;
  }

754
  return code;
M
Minghao Li 已提交
755
}
M
Minghao Li 已提交
756

757 758
// receiver on message
//
759
// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT
M
Minghao Li 已提交
760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
//              if receiver already start
//                    if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
//                    if sender.start-time = receiver.start-time, maybe duplicate msg
//                    if sender.start-time < receiver.start-time, ignore
//              else
//                    waiting for clock match
//                    start receiver(reply snapshot start)
//
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
//              a. create writer with <begin, end>
//
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
//
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
//
// condition 5, got data, update ack
M
Minghao Li 已提交
776
//
777
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
778 779
  SyncSnapshotSend      *pMsg = pRpcMsg->pCont;
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
780

M
Minghao Li 已提交
781
  // if already drop replica, do not process
S
Shengliang Guan 已提交
782
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
783
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
784 785
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
786 787
  }

788
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
789
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
790 791
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
792 793
  }

794
  if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
M
Minghao Li 已提交
795 796 797 798
    syncNodeStepDown(pSyncNode, pMsg->term);
  }
  syncNodeResetElectTimer(pSyncNode);

799
  // state, term, seq/ack
800
  int32_t code = 0;
801
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
802
    if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
803
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
S
Shengliang Guan 已提交
804
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
805
        code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
M
Minghao Li 已提交
806
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
S
Shengliang Guan 已提交
807
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
808
        code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
M
Minghao Li 已提交
809
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
810
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
811
        code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
S
Shengliang Guan 已提交
812 813
        if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
          sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
814
          code = -1;
S
Shengliang Guan 已提交
815
        }
M
Minghao Li 已提交
816
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
M
Minghao Li 已提交
817
        // force close, no response
S
Shengliang Guan 已提交
818
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
819
        snapshotReceiverStop(pReceiver);
M
Minghao Li 已提交
820
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
821
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
822
        code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
M
Minghao Li 已提交
823
      } else {
M
Minghao Li 已提交
824
        // error log
S
Shengliang Guan 已提交
825
        sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
826
        code = -1;
827
      }
828 829
    } else {
      // error log
S
Shengliang Guan 已提交
830
      sRError(pReceiver, "snapshot receiver term not equal");
831
      code = -1;
M
Minghao Li 已提交
832
    }
M
Minghao Li 已提交
833
  } else {
834
    // error log
S
Shengliang Guan 已提交
835
    sRError(pReceiver, "snapshot receiver not follower");
836
    code = -1;
M
Minghao Li 已提交
837
  }
M
Minghao Li 已提交
838

839
  return code;
M
Minghao Li 已提交
840 841
}

842
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
S
Shengliang Guan 已提交
843
  SSnapshot snapshot = {0};
M
Minghao Li 已提交
844 845 846 847 848 849
  pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);

  // prepare <begin, end>
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
  pSender->snapshotParam.end = snapshot.lastApplyIndex;

S
Shengliang Guan 已提交
850 851
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
M
Minghao Li 已提交
852

M
Minghao Li 已提交
853
  if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
S
Shengliang Guan 已提交
854 855 856
    sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
            snapshot.lastApplyIndex);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
857 858 859
    return -1;
  }

M
Minghao Li 已提交
860 861 862
  // update sender
  pSender->snapshot = snapshot;

M
Minghao Li 已提交
863
  // start reader
864
  int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
M
Minghao Li 已提交
865
  if (code != 0) {
S
Shengliang Guan 已提交
866
    sSError(pSender, "prepare snapshot failed since %s", terrstr());
M
Minghao Li 已提交
867 868 869
    return -1;
  }

M
Minghao Li 已提交
870
  // update next index
871
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
M
Minghao Li 已提交
872

M
Minghao Li 已提交
873 874 875
  // update seq
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

M
Minghao Li 已提交
876
  // build begin msg
877
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
878 879 880 881
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "prepare snapshot failed since build msg error");
    return -1;
  }
882 883

  SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
M
Minghao Li 已提交
884
  pSendMsg->srcId = pSender->pSyncNode->myRaftId;
885
  pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
886
  pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode);
M
Minghao Li 已提交
887 888 889 890 891 892 893 894 895
  pSendMsg->beginIndex = pSender->snapshotParam.start;
  pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pSendMsg->lastConfig = pSender->lastConfig;
  pSendMsg->startTime = pSender->startTime;
  pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

  // send msg
S
Shengliang Guan 已提交
896 897 898 899 900
  syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
  if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "prepare snapshot failed since send msg error");
    return -1;
  }
M
Minghao Li 已提交
901 902 903 904

  return 0;
}

905 906 907 908 909 910
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
S
Shengliang Guan 已提交
911
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
912 913
  SyncSnapshotRsp *pMsg = pRpcMsg->pCont;

914
  // if already drop replica, do not process
915
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
916
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
917
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
918
    return -1;
919 920
  }

M
Minghao Li 已提交
921
  // get sender
S
Shengliang Guan 已提交
922 923 924 925 926 927
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
  if (pSender == NULL) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
928 929

  // state, term, seq/ack
930
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
931
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
932
    sSError(pSender, "snapshot sender not leader");
933
    terrno = TSDB_CODE_SYN_NOT_LEADER;
934 935 936 937 938
    goto _ERROR;
  }

  if (pMsg->startTime != pSender->startTime) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
939 940
    sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, error:%s 0x%x", pMsg->startTime,
            pSender->startTime, tstrerror(pMsg->code), pMsg->code);
941
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
942
    goto _ERROR;
943
  }
M
Minghao Li 已提交
944

945 946
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
  if (pMsg->term != currentTerm) {
947 948
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
    sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
949
            currentTerm);
950
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
951
    goto _ERROR;
952
  }
S
Shengliang Guan 已提交
953

954 955
  if (pMsg->code != 0) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
956
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
957
    terrno = pMsg->code;
958
    goto _ERROR;
959
  }
M
Minghao Li 已提交
960

961
  // prepare <begin, end>, send begin msg
962
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
963
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
964
    return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg);
965
  }
966

967 968 969 970 971 972 973
  if (pSender->pReader == NULL || pSender->finish) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
    sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
    terrno = pMsg->code;
    goto _ERROR;
  }

974 975 976 977 978
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
      return -1;
    }
979

980 981 982 983 984 985 986 987 988 989 990
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
    return 0;
  }

  // receive ack is finish, close sender
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
991
    syncLogReplMgrReset(pMgr);
992 993 994 995 996 997 998 999
    return 0;
  }

  // send next msg
  if (pMsg->ack == pSender->seq) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
    // update sender ack
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
1000
      return -1;
1001
    }
1002 1003 1004 1005 1006 1007
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
  } else if (pMsg->ack == pSender->seq - 1) {
    // maybe resend
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
1008 1009 1010
    if (snapshotReSend(pSender) != 0) {
      return -1;
    }
M
Minghao Li 已提交
1011
  } else {
1012
    // error log
1013 1014 1015 1016
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
    sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1017
    syncLogReplMgrReset(pMgr);
1018
    return -1;
1019 1020 1021
  }

  return 0;
1022 1023 1024 1025

_ERROR:
  snapshotSenderStop(pSender, true);
  SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1026
  syncLogReplMgrReset(pMgr);
1027 1028

  return -1;
1029
}