syncSnapshot.c 35.6 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21
#include "syncRaftLog.h"
M
Minghao Li 已提交
22
#include "syncRaftStore.h"
M
Minghao Li 已提交
23
#include "syncReplication.h"
M
Minghao Li 已提交
24
#include "syncUtil.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
S
Shengliang Guan 已提交
29
  if (!condition) return NULL;
M
Minghao Li 已提交
30

S
Shengliang Guan 已提交
31 32 33 34
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
  if (pSender == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
35
  }
36

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
  pSender->start = false;
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->pSyncNode = pSyncNode;
  pSender->replicaIndex = replicaIndex;
  pSender->term = pSyncNode->pRaftStore->currentTerm;
  pSender->startTime = 0;
  pSender->endTime = 0;
  pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
  pSender->finish = false;

M
Minghao Li 已提交
52 53
  return pSender;
}
M
Minghao Li 已提交
54

M
Minghao Li 已提交
55
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
S
Shengliang Guan 已提交
56
  if (pSender == NULL) return;
57

S
Shengliang Guan 已提交
58 59 60 61 62
  // free current block
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
    pSender->pCurrentBlock = NULL;
  }
63

S
Shengliang Guan 已提交
64 65 66 67
  // close reader
  if (pSender->pReader != NULL) {
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    pSender->pReader = NULL;
M
Minghao Li 已提交
68
  }
S
Shengliang Guan 已提交
69 70 71

  // free sender
  taosMemoryFree(pSender);
M
Minghao Li 已提交
72
}
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
76
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
77 78 79
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
80 81 82 83 84 85 86 87 88 89
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.data = NULL;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
90

91
  memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
M
Minghao Li 已提交
92 93 94
  pSender->sendingMS = 0;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSender->startTime = taosGetTimestampMs();
95
  pSender->lastSendTime = pSender->startTime;
M
Minghao Li 已提交
96
  pSender->finish = false;
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98
  // build begin msg
99
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
100 101 102 103
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
104 105

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
106
  pMsg->srcId = pSender->pSyncNode->myRaftId;
107
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
M
Minghao Li 已提交
108
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
109
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
110 111
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
112 113
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
114 115
  pMsg->startTime = pSender->startTime;
  pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
M
Minghao Li 已提交
116

117 118 119
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");

M
Minghao Li 已提交
120
  // send msg
S
Shengliang Guan 已提交
121 122 123 124
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }
125

126
  return 0;
M
Minghao Li 已提交
127 128
}

129
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
S
Shengliang Guan 已提交
130 131
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);

M
Minghao Li 已提交
132 133 134
  // update flag
  pSender->start = false;
  pSender->finish = finish;
M
Minghao Li 已提交
135
  pSender->endTime = taosGetTimestampMs();
M
Minghao Li 已提交
136

137
  // close reader
M
Minghao Li 已提交
138
  if (pSender->pReader != NULL) {
139
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
M
Minghao Li 已提交
140 141
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
142

143
  // free current block
M
Minghao Li 已提交
144 145
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
146
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
147 148 149 150
    pSender->blockLen = 0;
  }
}

151
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
152
// seq = ack + 1, already updated
153
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
154
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
155 156
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
157
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
158 159 160 161 162
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
S
Shengliang Guan 已提交
163 164 165 166 167 168
                                                           &pSender->pCurrentBlock, &pSender->blockLen);
  if (ret != 0) {
    sSError(pSender, "snapshot sender read failed since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
169 170
  if (pSender->blockLen > 0) {
    // has read data
171
    sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
172
  } else {
173
    // read finish, update seq to end
M
Minghao Li 已提交
174
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
S
Shengliang Guan 已提交
175
    sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
176
  }
M
Minghao Li 已提交
177

M
Minghao Li 已提交
178
  // build msg
179
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
180 181 182 183
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
    return -1;
  }
184 185

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
186
  pMsg->srcId = pSender->pSyncNode->myRaftId;
187
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
M
Minghao Li 已提交
188
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
189
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
190 191
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
192 193
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
194
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
195

M
Minghao Li 已提交
196 197 198
  if (pSender->pCurrentBlock != NULL) {
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
  }
M
Minghao Li 已提交
199

200
  // event log
S
Shengliang Guan 已提交
201
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
202
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
S
Shengliang Guan 已提交
203
  } else {
S
Shengliang Guan 已提交
204
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
S
Shengliang Guan 已提交
205
  }
206 207 208 209 210 211 212 213

  // send msg
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }

  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
214 215 216
  return 0;
}

M
Minghao Li 已提交
217 218
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
219 220
  // build msg
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
221 222 223 224
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
225 226 227

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
  pMsg->srcId = pSender->pSyncNode->myRaftId;
228
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
229 230 231 232 233 234 235 236 237
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->beginIndex = pSender->snapshotParam.start;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
  pMsg->seq = pSender->seq;

  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
M
Minghao Li 已提交
238
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
239
  }
M
Minghao Li 已提交
240

241 242 243
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");

244
  // send msg
S
Shengliang Guan 已提交
245 246 247 248
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
    return -1;
  }
249

250
  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
251 252 253
  return 0;
}

S
Shengliang Guan 已提交
254 255 256 257 258 259 260
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  if (pMsg->ack != pSender->seq) {
    sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

261
  pSender->ack = pMsg->ack;
S
Shengliang Guan 已提交
262 263 264 265
  pSender->seq++;

  sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
  return 0;
266 267
}

M
Minghao Li 已提交
268 269 270
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
271
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
S
Shengliang Guan 已提交
272
  sNInfo(pSyncNode, "snapshot sender starting ...");
M
Minghao Li 已提交
273

274 275
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
  if (pSender == NULL) {
S
Shengliang Guan 已提交
276
    sNError(pSyncNode, "snapshot sender start error since get failed");
M
Minghao Li 已提交
277
    return -1;
278 279
  }

M
Minghao Li 已提交
280
  if (snapshotSenderIsStart(pSender)) {
281
    sSInfo(pSender, "snapshot sender already start, ignore");
M
Minghao Li 已提交
282 283 284
    return 0;
  }

S
Shengliang Guan 已提交
285 286
  if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
    sSInfo(pSender, "snapshot sender start too frequently, ignore");
287
    return 0;
M
Minghao Li 已提交
288 289
  }

290
  sSInfo(pSender, "snapshot sender start");
291

S
Shengliang Guan 已提交
292
  int32_t code = snapshotSenderStart(pSender);
M
Minghao Li 已提交
293
  if (code != 0) {
S
Shengliang Guan 已提交
294
    sSError(pSender, "snapshot sender start error since %s", terrstr());
M
Minghao Li 已提交
295 296
    return -1;
  }
297 298 299

  return 0;
}
300

301
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
302 303
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
S
Shengliang Guan 已提交
304
  if (!condition) return NULL;
305

S
Shengliang Guan 已提交
306 307 308 309
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
  if (pReceiver == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
310
  }
311

S
Shengliang Guan 已提交
312 313 314 315 316 317 318 319 320 321 322
  pReceiver->start = false;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
  pReceiver->pWriter = NULL;
  pReceiver->pSyncNode = pSyncNode;
  pReceiver->fromId = fromId;
  pReceiver->term = pSyncNode->pRaftStore->currentTerm;
  pReceiver->snapshot.data = NULL;
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pReceiver->snapshot.lastApplyTerm = 0;
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;

323 324
  return pReceiver;
}
M
Minghao Li 已提交
325

326
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
327
  if (pReceiver == NULL) return;
328

S
Shengliang Guan 已提交
329 330 331 332 333 334 335 336
  // close writer
  if (pReceiver->pWriter != NULL) {
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
    }
    pReceiver->pWriter = NULL;
337
  }
S
Shengliang Guan 已提交
338 339 340

  // free receiver
  taosMemoryFree(pReceiver);
341
}
M
Minghao Li 已提交
342

343 344
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

345
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
S
Shengliang Guan 已提交
346 347 348 349 350
  if (pReceiver->pWriter != NULL) {
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
M
Minghao Li 已提交
351 352 353 354 355 356 357 358 359 360 361 362

  // update ack
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  // update snapshot
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;

  // start writer
S
Shengliang Guan 已提交
363 364 365 366 367 368
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
                                                                 &pReceiver->pWriter);
  if (ret != 0) {
    sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
369 370

  // event log
S
Shengliang Guan 已提交
371
  sRInfo(pReceiver, "snapshot receiver start write");
M
Minghao Li 已提交
372 373 374
  return 0;
}

375
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
376
  if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
377
    sRInfo(pReceiver, "snapshot receiver has started");
378
    return;
379
  }
M
Minghao Li 已提交
380 381 382 383 384 385 386 387

  pReceiver->start = true;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
  pReceiver->fromId = pPreMsg->srcId;
  pReceiver->startTime = pPreMsg->startTime;

  // event log
S
Shengliang Guan 已提交
388
  sRInfo(pReceiver, "snapshot receiver is start");
389
}
M
Minghao Li 已提交
390

391
// just set start = false
S
Shengliang Guan 已提交
392
// FpSnapshotStopWrite should not be called
393
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
394 395
  sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);

M
Minghao Li 已提交
396
  if (pReceiver->pWriter != NULL) {
397
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
S
Shengliang Guan 已提交
398 399 400 401
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
    }
M
Minghao Li 已提交
402
    pReceiver->pWriter = NULL;
S
Shengliang Guan 已提交
403 404
  } else {
    sRInfo(pReceiver, "snapshot receiver stop, writer is null");
405
  }
M
Minghao Li 已提交
406 407

  pReceiver->start = false;
408 409
}

410
// when recv last snapshot block, apply data into snapshot
411 412
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  int32_t code = 0;
413
  if (pReceiver->pWriter != NULL) {
414
    // write data
S
Shengliang Guan 已提交
415
    sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
416 417 418
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
419
      if (code != 0) {
S
Shengliang Guan 已提交
420
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
421 422 423 424 425
        return -1;
      }
    }

    // reset wal
S
Shengliang Guan 已提交
426
    sRInfo(pReceiver, "snapshot receiver log restore");
427 428 429
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
S
Shengliang Guan 已提交
430
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
431
      return -1;
432 433
    }

434 435 436 437 438
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
439 440 441 442 443 444
    // maybe update term
    if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
      pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
      raftStorePersist(pReceiver->pSyncNode->pRaftStore);
    }

445
    // stop writer, apply data
S
Shengliang Guan 已提交
446
    sRInfo(pReceiver, "snapshot receiver apply write");
447
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
S
Shengliang Guan 已提交
448
                                                           &pReceiver->snapshot);
449
    if (code != 0) {
S
Shengliang Guan 已提交
450
      sRError(pReceiver, "snapshot receiver apply failed  since %s", terrstr());
451 452
      return -1;
    }
453 454
    pReceiver->pWriter = NULL;

455 456
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
457

458
  } else {
S
Shengliang Guan 已提交
459
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
460
    return -1;
461 462 463
  }

  // event log
S
Shengliang Guan 已提交
464
  sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
465
  return 0;
466 467
}

468 469
// apply data block
// update progress
S
Shengliang Guan 已提交
470 471 472
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  if (pMsg->seq != pReceiver->ack + 1) {
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
473
    terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
S
Shengliang Guan 已提交
474 475
    return -1;
  }
476

S
Shengliang Guan 已提交
477 478 479 480 481
  if (pReceiver->pWriter == NULL) {
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
482

S
Shengliang Guan 已提交
483
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
484

S
Shengliang Guan 已提交
485 486 487 488 489 490 491 492
  if (pMsg->dataLen > 0) {
    // apply data block
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                 pMsg->data, pMsg->dataLen);
    if (code != 0) {
      sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
      return -1;
    }
493
  }
S
Shengliang Guan 已提交
494 495 496 497 498 499 500

  // update progress
  pReceiver->ack = pMsg->seq;

  // event log
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
  return 0;
501 502
}

M
Minghao Li 已提交
503 504 505 506 507
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
  SyncIndex snapStart = SYNC_INDEX_INVALID;

  if (syncNodeIsMnode(ths)) {
    snapStart = SYNC_INDEX_BEGIN;
S
Shengliang Guan 已提交
508
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
M
Minghao Li 已提交
509 510 511 512 513 514 515 516
  } else {
    SSyncLogStoreData *pData = ths->pLogStore->data;
    SWal              *pWal = pData->pWal;

    bool    isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
    int64_t walCommitVer = walGetCommittedVer(pWal);

    if (!isEmpty && ths->commitIndex != walCommitVer) {
S
Shengliang Guan 已提交
517 518
      sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer,
              ths->commitIndex);
M
Minghao Li 已提交
519 520 521 522
      snapStart = walCommitVer + 1;
    } else {
      snapStart = ths->commitIndex + 1;
    }
S
Shengliang Guan 已提交
523 524

    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
M
Minghao Li 已提交
525 526 527 528 529 530 531
  }

  return snapStart;
}

static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
S
Shengliang Guan 已提交
532
  int64_t                timeNow = taosGetTimestampMs();
533
  int32_t                code = 0;
M
Minghao Li 已提交
534 535 536 537

  if (snapshotReceiverIsStart(pReceiver)) {
    // already start
    if (pMsg->startTime > pReceiver->startTime) {
S
Shengliang Guan 已提交
538 539
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
540 541
      goto _START_RECEIVER;
    } else if (pMsg->startTime == pReceiver->startTime) {
S
Shengliang Guan 已提交
542 543
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
544 545 546
      goto _SEND_REPLY;
    } else {
      // ignore
547 548 549 550 551
      sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
              pReceiver->startTime, pMsg->startTime);
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
      code = terrno;
      goto _SEND_REPLY;
M
Minghao Li 已提交
552 553 554
    }
  } else {
    // start new
S
Shengliang Guan 已提交
555
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
M
Minghao Li 已提交
556 557 558 559
    goto _START_RECEIVER;
  }

_START_RECEIVER:
S
Shengliang Guan 已提交
560 561 562
  if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
    sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
            pMsg->startTime);
563 564
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    code = terrno;
M
Minghao Li 已提交
565 566
  } else {
    // waiting for clock match
M
Minghao Li 已提交
567
    while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
568 569
      sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
             pMsg->startTime);
M
Minghao Li 已提交
570
      taosMsleep(10);
571
      timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
572 573
    }

574
    if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
575
      sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
576
      snapshotReceiverStop(pReceiver);
577 578
    }

M
Minghao Li 已提交
579 580 581 582 583 584
    snapshotReceiverStart(pReceiver, pMsg);  // set start-time same with sender
  }

_SEND_REPLY:
    // build msg
    ;  // make complier happy
585 586

  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
587 588 589 590
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }
591 592

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
593 594 595 596 597 598 599
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pMsg->seq;  // receiver maybe already closed
600
  pRspMsg->code = code;
M
Minghao Li 已提交
601 602 603
  pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);

  // send msg
S
Shengliang Guan 已提交
604 605 606 607 608 609
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }

610
  return code;
M
Minghao Li 已提交
611 612 613 614 615
}

static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 1
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
616
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
617

M
Minghao Li 已提交
618
  if (!snapshotReceiverIsStart(pReceiver)) {
619 620
    sRError(pReceiver, "snapshot receiver begin failed since not start");
    goto _SEND_REPLY;
M
Minghao Li 已提交
621 622
  }

M
Minghao Li 已提交
623
  if (pReceiver->startTime != pMsg->startTime) {
624
    sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
S
Shengliang Guan 已提交
625
            pReceiver->startTime, pMsg->startTime);
626
    goto _SEND_REPLY;
M
Minghao Li 已提交
627
  }
M
Minghao Li 已提交
628

M
Minghao Li 已提交
629
  // start writer
630 631 632 633 634 635 636 637 638 639
  if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) {
    sRError(pReceiver, "snapshot receiver begin failed since start writer failed");
    goto _SEND_REPLY;
  }

  code = 0;
_SEND_REPLY:
  if (code != 0 && terrno != 0) {
    code = terrno;
  }
M
Minghao Li 已提交
640

M
Minghao Li 已提交
641
  // build msg
642
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
643 644 645 646
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
647 648

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
649 650 651 652 653 654 655
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
656
  pRspMsg->code = code;
M
Minghao Li 已提交
657
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
M
Minghao Li 已提交
658

M
Minghao Li 已提交
659
  // send msg
S
Shengliang Guan 已提交
660 661 662 663 664 665
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }

666
  return code;
M
Minghao Li 已提交
667 668
}

669
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
670 671 672 673 674 675 676
  // condition 4
  // transfering
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
677 678
    sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
679
    taosMsleep(10);
S
Shengliang Guan 已提交
680
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
681 682
  }

683
  int32_t code = 0;
S
Shengliang Guan 已提交
684
  if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
685 686 687 688
    code = terrno;
    if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
    }
M
Minghao Li 已提交
689 690
  }

M
Minghao Li 已提交
691
  // build msg
692
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
693 694 695 696
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
697 698

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
699 700 701 702 703 704 705
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
706
  pRspMsg->code = code;
M
Minghao Li 已提交
707 708 709
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
710
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received");
S
Shengliang Guan 已提交
711 712 713 714
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }
715

716
  return code;
M
Minghao Li 已提交
717 718
}

M
Minghao Li 已提交
719 720 721 722 723 724 725 726
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 2
  // end, finish FSM
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
727 728
    sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
729
    taosMsleep(10);
M
Minghao Li 已提交
730
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
731 732 733 734 735 736 737 738
  }

  int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
  if (code == 0) {
    snapshotReceiverStop(pReceiver);
  }

  // build msg
739
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
740 741 742 743
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
    return -1;
  }
744 745

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
746 747 748 749 750 751 752
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
753
  pRspMsg->code = code;
M
Minghao Li 已提交
754 755 756
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
S
Shengliang Guan 已提交
757 758 759 760 761 762
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
    return -1;
  }

763
  return code;
M
Minghao Li 已提交
764
}
M
Minghao Li 已提交
765

766 767
// receiver on message
//
M
Minghao Li 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
//              if receiver already start
//                    if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
//                    if sender.start-time = receiver.start-time, maybe duplicate msg
//                    if sender.start-time < receiver.start-time, ignore
//              else
//                    waiting for clock match
//                    start receiver(reply snapshot start)
//
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
//              a. create writer with <begin, end>
//
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
//
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
//
// condition 5, got data, update ack
M
Minghao Li 已提交
785
//
786
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
787 788
  SyncSnapshotSend      *pMsg = pRpcMsg->pCont;
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
789

M
Minghao Li 已提交
790
  // if already drop replica, do not process
S
Shengliang Guan 已提交
791
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
792
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
793 794
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
795 796 797
  }

  if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
S
Shengliang Guan 已提交
798
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
799 800
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
801 802 803 804 805 806 807
  }

  if (pMsg->term > pSyncNode->pRaftStore->currentTerm) {
    syncNodeStepDown(pSyncNode, pMsg->term);
  }
  syncNodeResetElectTimer(pSyncNode);

808
  // state, term, seq/ack
809
  int32_t code = 0;
810
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
811
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
812
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
S
Shengliang Guan 已提交
813
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
814
        code = syncNodeOnSnapshotPre(pSyncNode, pMsg);
M
Minghao Li 已提交
815
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
S
Shengliang Guan 已提交
816
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
817
        code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
M
Minghao Li 已提交
818
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
819
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
820
        code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
S
Shengliang Guan 已提交
821 822
        if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
          sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
823
          code = -1;
S
Shengliang Guan 已提交
824
        }
M
Minghao Li 已提交
825
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
M
Minghao Li 已提交
826
        // force close, no response
S
Shengliang Guan 已提交
827
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
828
        snapshotReceiverStop(pReceiver);
M
Minghao Li 已提交
829
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
830
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
831
        code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
M
Minghao Li 已提交
832
      } else {
M
Minghao Li 已提交
833
        // error log
S
Shengliang Guan 已提交
834
        sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
835
        code = -1;
836
      }
837 838
    } else {
      // error log
S
Shengliang Guan 已提交
839
      sRError(pReceiver, "snapshot receiver term not equal");
840
      code = -1;
M
Minghao Li 已提交
841
    }
M
Minghao Li 已提交
842
  } else {
843
    // error log
S
Shengliang Guan 已提交
844
    sRError(pReceiver, "snapshot receiver not follower");
845
    code = -1;
M
Minghao Li 已提交
846
  }
M
Minghao Li 已提交
847

848
  return code;
M
Minghao Li 已提交
849 850
}

851
static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
S
Shengliang Guan 已提交
852
  SSnapshot snapshot = {0};
M
Minghao Li 已提交
853 854 855 856 857 858
  pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);

  // prepare <begin, end>
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
  pSender->snapshotParam.end = snapshot.lastApplyIndex;

S
Shengliang Guan 已提交
859 860
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
M
Minghao Li 已提交
861

M
Minghao Li 已提交
862
  if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
S
Shengliang Guan 已提交
863 864 865
    sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
            snapshot.lastApplyIndex);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
866 867 868
    return -1;
  }

M
Minghao Li 已提交
869 870 871
  // update sender
  pSender->snapshot = snapshot;

M
Minghao Li 已提交
872
  // start reader
873
  int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
M
Minghao Li 已提交
874
  if (code != 0) {
S
Shengliang Guan 已提交
875
    sSError(pSender, "prepare snapshot failed since %s", terrstr());
M
Minghao Li 已提交
876 877 878
    return -1;
  }

M
Minghao Li 已提交
879
  // update next index
880
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
M
Minghao Li 已提交
881

M
Minghao Li 已提交
882 883 884
  // update seq
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

M
Minghao Li 已提交
885
  // build begin msg
886
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
887 888 889 890
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "prepare snapshot failed since build msg error");
    return -1;
  }
891 892

  SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
M
Minghao Li 已提交
893
  pSendMsg->srcId = pSender->pSyncNode->myRaftId;
894
  pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
M
Minghao Li 已提交
895 896 897 898 899 900 901 902 903 904
  pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSendMsg->beginIndex = pSender->snapshotParam.start;
  pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pSendMsg->lastConfig = pSender->lastConfig;
  pSendMsg->startTime = pSender->startTime;
  pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

  // send msg
S
Shengliang Guan 已提交
905 906 907 908 909
  syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
  if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "prepare snapshot failed since send msg error");
    return -1;
  }
M
Minghao Li 已提交
910 911 912 913

  return 0;
}

914 915 916 917 918 919
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
S
Shengliang Guan 已提交
920
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
921 922
  SyncSnapshotRsp *pMsg = pRpcMsg->pCont;

923
  // if already drop replica, do not process
924
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
925
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
926
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
927
    return -1;
928 929
  }

M
Minghao Li 已提交
930
  // get sender
S
Shengliang Guan 已提交
931 932 933 934 935 936
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
  if (pSender == NULL) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
937 938

  // state, term, seq/ack
939
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
940
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
941
    sSError(pSender, "snapshot sender not leader");
942
    terrno = TSDB_CODE_SYN_NOT_LEADER;
943 944 945 946 947 948 949
    goto _ERROR;
  }

  if (pMsg->startTime != pSender->startTime) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
    sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime,
            pSender->startTime, pMsg->code);
950
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
951
    goto _ERROR;
952
  }
M
Minghao Li 已提交
953

954
  if (pMsg->term != pSyncNode->pRaftStore->currentTerm) {
955 956 957
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
    sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
            pSyncNode->pRaftStore->currentTerm);
958
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
959
    goto _ERROR;
960
  }
S
Shengliang Guan 已提交
961

962 963 964
  if (pMsg->code != 0) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
    sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code);
965
    terrno = pMsg->code;
966
    goto _ERROR;
967
  }
M
Minghao Li 已提交
968

969 970 971
  // prepare <begin, end>, send begin msg
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
972
    return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg);
973
  }
974

975 976 977 978 979 980 981
  if (pSender->pReader == NULL || pSender->finish) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
    sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
    terrno = pMsg->code;
    goto _ERROR;
  }

982 983 984 985 986
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
      return -1;
    }
987

988 989 990 991 992 993 994 995 996 997 998
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
    return 0;
  }

  // receive ack is finish, close sender
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
999
    syncLogReplMgrReset(pMgr);
1000 1001 1002 1003 1004 1005 1006 1007
    return 0;
  }

  // send next msg
  if (pMsg->ack == pSender->seq) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
    // update sender ack
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
1008
      return -1;
1009
    }
1010 1011 1012 1013 1014 1015
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
  } else if (pMsg->ack == pSender->seq - 1) {
    // maybe resend
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
1016 1017 1018
    if (snapshotReSend(pSender) != 0) {
      return -1;
    }
M
Minghao Li 已提交
1019
  } else {
1020
    // error log
1021 1022 1023 1024
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
    sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1025
    syncLogReplMgrReset(pMgr);
1026
    return -1;
1027 1028 1029
  }

  return 0;
1030 1031 1032 1033

_ERROR:
  snapshotSenderStop(pSender, true);
  SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1034
  syncLogReplMgrReset(pMgr);
1035 1036

  return -1;
1037
}