syncSnapshot.c 35.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21
#include "syncRaftLog.h"
M
Minghao Li 已提交
22
#include "syncRaftStore.h"
M
Minghao Li 已提交
23
#include "syncReplication.h"
M
Minghao Li 已提交
24
#include "syncUtil.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
S
Shengliang Guan 已提交
29
  if (!condition) return NULL;
M
Minghao Li 已提交
30

S
Shengliang Guan 已提交
31 32 33 34
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
  if (pSender == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
35
  }
36

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45
  pSender->start = false;
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->pSyncNode = pSyncNode;
  pSender->replicaIndex = replicaIndex;
46
  pSender->term = raftStoreGetTerm(pSyncNode);
S
Shengliang Guan 已提交
47 48 49 50 51
  pSender->startTime = 0;
  pSender->endTime = 0;
  pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
  pSender->finish = false;

M
Minghao Li 已提交
52 53
  return pSender;
}
M
Minghao Li 已提交
54

M
Minghao Li 已提交
55
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
S
Shengliang Guan 已提交
56
  if (pSender == NULL) return;
57

S
Shengliang Guan 已提交
58 59 60 61 62
  // free current block
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
    pSender->pCurrentBlock = NULL;
  }
63

S
Shengliang Guan 已提交
64 65 66 67
  // close reader
  if (pSender->pReader != NULL) {
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    pSender->pReader = NULL;
M
Minghao Li 已提交
68
  }
S
Shengliang Guan 已提交
69 70 71

  // free sender
  taosMemoryFree(pSender);
M
Minghao Li 已提交
72
}
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
76
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
77 78 79
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
80 81 82 83 84 85 86 87 88 89
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.data = NULL;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
90

91
  memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
M
Minghao Li 已提交
92
  pSender->sendingMS = 0;
93
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
M
Minghao Li 已提交
94
  pSender->startTime = taosGetTimestampMs();
95
  pSender->lastSendTime = pSender->startTime;
M
Minghao Li 已提交
96
  pSender->finish = false;
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98
  // build begin msg
99
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
100 101 102 103
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
104 105

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
106
  pMsg->srcId = pSender->pSyncNode->myRaftId;
107
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
108
  pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
109
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
110 111
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
112 113
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
114
  pMsg->startTime = pSender->startTime;
115
  pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
M
Minghao Li 已提交
116

117 118 119
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");

M
Minghao Li 已提交
120
  // send msg
S
Shengliang Guan 已提交
121 122 123 124
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }
125

126
  return 0;
M
Minghao Li 已提交
127 128
}

129
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
S
Shengliang Guan 已提交
130 131
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);

M
Minghao Li 已提交
132 133 134
  // update flag
  pSender->start = false;
  pSender->finish = finish;
M
Minghao Li 已提交
135
  pSender->endTime = taosGetTimestampMs();
M
Minghao Li 已提交
136

137
  // close reader
M
Minghao Li 已提交
138
  if (pSender->pReader != NULL) {
139
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
M
Minghao Li 已提交
140 141
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
142

143
  // free current block
M
Minghao Li 已提交
144 145
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
146
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
147 148 149 150
    pSender->blockLen = 0;
  }
}

151
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
152
// seq = ack + 1, already updated
153
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
154
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
155 156
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
157
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
158 159 160 161 162
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
S
Shengliang Guan 已提交
163 164 165 166 167 168
                                                           &pSender->pCurrentBlock, &pSender->blockLen);
  if (ret != 0) {
    sSError(pSender, "snapshot sender read failed since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
169 170
  if (pSender->blockLen > 0) {
    // has read data
171 172
    sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
            pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
173
  } else {
174
    // read finish, update seq to end
M
Minghao Li 已提交
175
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
176 177
    sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
           pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
178
  }
M
Minghao Li 已提交
179

M
Minghao Li 已提交
180
  // build msg
181
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
182
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
183
    sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
S
Shengliang Guan 已提交
184 185
    return -1;
  }
186 187

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
188
  pMsg->srcId = pSender->pSyncNode->myRaftId;
189
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
190
  pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
191
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
192 193
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
194 195
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
196
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
197

M
Minghao Li 已提交
198 199 200
  if (pSender->pCurrentBlock != NULL) {
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
  }
M
Minghao Li 已提交
201

202
  // event log
S
Shengliang Guan 已提交
203
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
204
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
S
Shengliang Guan 已提交
205
  } else {
S
Shengliang Guan 已提交
206
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
S
Shengliang Guan 已提交
207
  }
208 209 210 211 212 213 214 215

  // send msg
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }

  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
216 217 218
  return 0;
}

M
Minghao Li 已提交
219 220
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
221 222
  // build msg
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
223 224 225 226
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
227 228 229

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
  pMsg->srcId = pSender->pSyncNode->myRaftId;
230
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
231
  pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
232 233 234 235 236 237 238 239
  pMsg->beginIndex = pSender->snapshotParam.start;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
  pMsg->seq = pSender->seq;

  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
M
Minghao Li 已提交
240
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
241
  }
M
Minghao Li 已提交
242

243 244 245
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");

246
  // send msg
S
Shengliang Guan 已提交
247 248 249 250
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
    return -1;
  }
251

252
  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
253 254 255
  return 0;
}

S
Shengliang Guan 已提交
256 257 258 259 260 261 262
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  if (pMsg->ack != pSender->seq) {
    sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

263
  pSender->ack = pMsg->ack;
S
Shengliang Guan 已提交
264 265 266 267
  pSender->seq++;

  sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
  return 0;
268 269
}

M
Minghao Li 已提交
270 271 272
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
273
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
S
Shengliang Guan 已提交
274
  sNInfo(pSyncNode, "snapshot sender starting ...");
M
Minghao Li 已提交
275

276 277
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
  if (pSender == NULL) {
S
Shengliang Guan 已提交
278
    sNError(pSyncNode, "snapshot sender start error since get failed");
M
Minghao Li 已提交
279
    return -1;
280 281
  }

M
Minghao Li 已提交
282
  if (snapshotSenderIsStart(pSender)) {
283
    sSInfo(pSender, "snapshot sender already start, ignore");
M
Minghao Li 已提交
284 285 286
    return 0;
  }

S
Shengliang Guan 已提交
287 288
  if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
    sSInfo(pSender, "snapshot sender start too frequently, ignore");
289
    return 0;
M
Minghao Li 已提交
290 291
  }

292
  sSInfo(pSender, "snapshot sender start");
293

S
Shengliang Guan 已提交
294
  int32_t code = snapshotSenderStart(pSender);
M
Minghao Li 已提交
295
  if (code != 0) {
S
Shengliang Guan 已提交
296
    sSError(pSender, "snapshot sender start error since %s", terrstr());
M
Minghao Li 已提交
297 298
    return -1;
  }
299 300 301

  return 0;
}
302

303
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
304 305
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
S
Shengliang Guan 已提交
306
  if (!condition) return NULL;
307

S
Shengliang Guan 已提交
308 309 310 311
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
  if (pReceiver == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
312
  }
313

S
Shengliang Guan 已提交
314 315 316 317 318
  pReceiver->start = false;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
  pReceiver->pWriter = NULL;
  pReceiver->pSyncNode = pSyncNode;
  pReceiver->fromId = fromId;
319
  pReceiver->term = raftStoreGetTerm(pSyncNode);
S
Shengliang Guan 已提交
320 321 322 323 324
  pReceiver->snapshot.data = NULL;
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pReceiver->snapshot.lastApplyTerm = 0;
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;

325 326
  return pReceiver;
}
M
Minghao Li 已提交
327

328
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
329
  if (pReceiver == NULL) return;
330

S
Shengliang Guan 已提交
331 332 333 334 335 336 337 338
  // close writer
  if (pReceiver->pWriter != NULL) {
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
    }
    pReceiver->pWriter = NULL;
339
  }
S
Shengliang Guan 已提交
340 341 342

  // free receiver
  taosMemoryFree(pReceiver);
343
}
M
Minghao Li 已提交
344

345 346 347
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
  return (pReceiver != NULL ? pReceiver->start : false);
}
348

349
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
S
Shengliang Guan 已提交
350
  if (pReceiver->pWriter != NULL) {
351
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
S
Shengliang Guan 已提交
352 353 354
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
M
Minghao Li 已提交
355 356 357 358 359 360 361 362 363 364 365 366

  // update ack
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  // update snapshot
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;

  // start writer
S
Shengliang Guan 已提交
367 368 369 370 371 372
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
                                                                 &pReceiver->pWriter);
  if (ret != 0) {
    sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
373 374

  // event log
S
Shengliang Guan 已提交
375
  sRInfo(pReceiver, "snapshot receiver start write");
M
Minghao Li 已提交
376 377 378
  return 0;
}

379
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
380
  if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
381
    sRInfo(pReceiver, "snapshot receiver has started");
382
    return;
383
  }
M
Minghao Li 已提交
384 385

  pReceiver->start = true;
386
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
387
  pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode);
M
Minghao Li 已提交
388 389 390 391
  pReceiver->fromId = pPreMsg->srcId;
  pReceiver->startTime = pPreMsg->startTime;

  // event log
S
Shengliang Guan 已提交
392
  sRInfo(pReceiver, "snapshot receiver is start");
393
}
M
Minghao Li 已提交
394

395
// just set start = false
S
Shengliang Guan 已提交
396
// FpSnapshotStopWrite should not be called
397
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
398 399
  sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);

M
Minghao Li 已提交
400
  if (pReceiver->pWriter != NULL) {
401
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
S
Shengliang Guan 已提交
402 403 404 405
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
    }
M
Minghao Li 已提交
406
    pReceiver->pWriter = NULL;
S
Shengliang Guan 已提交
407 408
  } else {
    sRInfo(pReceiver, "snapshot receiver stop, writer is null");
409
  }
M
Minghao Li 已提交
410 411

  pReceiver->start = false;
412 413
}

414
// when recv last snapshot block, apply data into snapshot
415 416
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  int32_t code = 0;
417
  if (pReceiver->pWriter != NULL) {
418
    // write data
S
Shengliang Guan 已提交
419
    sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
420 421 422
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
423
      if (code != 0) {
S
Shengliang Guan 已提交
424
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
425 426 427 428 429
        return -1;
      }
    }

    // reset wal
S
Shengliang Guan 已提交
430
    sRInfo(pReceiver, "snapshot receiver log restore");
431 432 433
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
S
Shengliang Guan 已提交
434
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
435
      return -1;
436 437
    }

438 439 440 441 442
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
443
    // maybe update term
444 445
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
M
Minghao Li 已提交
446 447
    }

448
    // stop writer, apply data
S
Shengliang Guan 已提交
449
    sRInfo(pReceiver, "snapshot receiver apply write");
450
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
S
Shengliang Guan 已提交
451
                                                           &pReceiver->snapshot);
452
    if (code != 0) {
S
Shengliang Guan 已提交
453
      sRError(pReceiver, "snapshot receiver apply failed  since %s", terrstr());
454 455
      return -1;
    }
456 457
    pReceiver->pWriter = NULL;

458 459
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
460

461
  } else {
S
Shengliang Guan 已提交
462
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
463
    return -1;
464 465 466
  }

  // event log
S
Shengliang Guan 已提交
467
  sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
468
  return 0;
469 470
}

471 472
// apply data block
// update progress
S
Shengliang Guan 已提交
473 474 475
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  if (pMsg->seq != pReceiver->ack + 1) {
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
476
    terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
S
Shengliang Guan 已提交
477 478
    return -1;
  }
479

S
Shengliang Guan 已提交
480 481 482 483 484
  if (pReceiver->pWriter == NULL) {
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
485

S
Shengliang Guan 已提交
486
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
487

S
Shengliang Guan 已提交
488 489 490 491 492 493 494 495
  if (pMsg->dataLen > 0) {
    // apply data block
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                 pMsg->data, pMsg->dataLen);
    if (code != 0) {
      sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
      return -1;
    }
496
  }
S
Shengliang Guan 已提交
497 498 499 500 501 502 503

  // update progress
  pReceiver->ack = pMsg->seq;

  // event log
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
  return 0;
504 505
}

M
Minghao Li 已提交
506 507 508 509 510
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
  SyncIndex snapStart = SYNC_INDEX_INVALID;

  if (syncNodeIsMnode(ths)) {
    snapStart = SYNC_INDEX_BEGIN;
S
Shengliang Guan 已提交
511
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
M
Minghao Li 已提交
512 513 514 515 516
  } else {
    SSyncLogStoreData *pData = ths->pLogStore->data;
    SWal              *pWal = pData->pWal;

    int64_t walCommitVer = walGetCommittedVer(pWal);
517
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
S
Shengliang Guan 已提交
518 519

    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
M
Minghao Li 已提交
520 521 522 523 524
  }

  return snapStart;
}

525
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
526
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
S
Shengliang Guan 已提交
527
  int64_t                timeNow = taosGetTimestampMs();
528
  int32_t                code = 0;
M
Minghao Li 已提交
529 530 531 532

  if (snapshotReceiverIsStart(pReceiver)) {
    // already start
    if (pMsg->startTime > pReceiver->startTime) {
S
Shengliang Guan 已提交
533 534
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
535 536
      goto _START_RECEIVER;
    } else if (pMsg->startTime == pReceiver->startTime) {
S
Shengliang Guan 已提交
537 538
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
539 540 541
      goto _SEND_REPLY;
    } else {
      // ignore
542 543 544 545 546
      sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
              pReceiver->startTime, pMsg->startTime);
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
      code = terrno;
      goto _SEND_REPLY;
M
Minghao Li 已提交
547 548 549
    }
  } else {
    // start new
S
Shengliang Guan 已提交
550
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
M
Minghao Li 已提交
551 552 553 554
    goto _START_RECEIVER;
  }

_START_RECEIVER:
S
Shengliang Guan 已提交
555 556 557
  if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
    sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
            pMsg->startTime);
558 559
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    code = terrno;
M
Minghao Li 已提交
560 561
  } else {
    // waiting for clock match
M
Minghao Li 已提交
562
    while (timeNow < pMsg->startTime) {
563
      sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", startTime:%" PRId64, timeNow,
S
Shengliang Guan 已提交
564
             pMsg->startTime);
M
Minghao Li 已提交
565
      taosMsleep(10);
566
      timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
567 568
    }

569
    if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
570
      sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
571
      snapshotReceiverStop(pReceiver);
572 573
    }

M
Minghao Li 已提交
574 575 576 577 578 579
    snapshotReceiverStart(pReceiver, pMsg);  // set start-time same with sender
  }

_SEND_REPLY:
    // build msg
    ;  // make complier happy
580 581

  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
582 583 584 585
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }
586 587

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
588 589
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
590
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
591 592 593 594
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pMsg->seq;  // receiver maybe already closed
595
  pRspMsg->code = code;
M
Minghao Li 已提交
596 597 598
  pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);

  // send msg
S
Shengliang Guan 已提交
599 600 601 602 603 604
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }

605
  return code;
M
Minghao Li 已提交
606 607 608 609 610
}

static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 1
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
611
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
612

M
Minghao Li 已提交
613
  if (!snapshotReceiverIsStart(pReceiver)) {
614 615
    sRError(pReceiver, "snapshot receiver begin failed since not start");
    goto _SEND_REPLY;
M
Minghao Li 已提交
616 617
  }

M
Minghao Li 已提交
618
  if (pReceiver->startTime != pMsg->startTime) {
619
    sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
S
Shengliang Guan 已提交
620
            pReceiver->startTime, pMsg->startTime);
621
    goto _SEND_REPLY;
M
Minghao Li 已提交
622
  }
M
Minghao Li 已提交
623

M
Minghao Li 已提交
624
  // start writer
625 626 627 628 629 630 631 632 633 634
  if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) {
    sRError(pReceiver, "snapshot receiver begin failed since start writer failed");
    goto _SEND_REPLY;
  }

  code = 0;
_SEND_REPLY:
  if (code != 0 && terrno != 0) {
    code = terrno;
  }
M
Minghao Li 已提交
635

M
Minghao Li 已提交
636
  // build msg
637
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
638 639 640 641
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
642 643

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
644 645
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
646
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
647 648 649 650
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
651
  pRspMsg->code = code;
M
Minghao Li 已提交
652
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
M
Minghao Li 已提交
653

M
Minghao Li 已提交
654
  // send msg
S
Shengliang Guan 已提交
655 656 657 658 659 660
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }

661
  return code;
M
Minghao Li 已提交
662 663
}

664
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
665 666 667 668 669 670 671
  // condition 4
  // transfering
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
672 673
    sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
674
    taosMsleep(10);
S
Shengliang Guan 已提交
675
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
676 677
  }

678
  int32_t code = 0;
S
Shengliang Guan 已提交
679
  if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
680 681 682 683
    code = terrno;
    if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
    }
M
Minghao Li 已提交
684 685
  }

M
Minghao Li 已提交
686
  // build msg
687
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
688 689 690 691
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
692 693

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
694 695
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
696
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
697 698 699 700
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
701
  pRspMsg->code = code;
M
Minghao Li 已提交
702 703 704
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
705
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received");
S
Shengliang Guan 已提交
706 707 708 709
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }
710

711
  return code;
M
Minghao Li 已提交
712 713
}

M
Minghao Li 已提交
714 715 716 717 718 719 720 721
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 2
  // end, finish FSM
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
722 723
    sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
724
    taosMsleep(10);
M
Minghao Li 已提交
725
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
726 727 728 729 730 731 732 733
  }

  int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
  if (code == 0) {
    snapshotReceiverStop(pReceiver);
  }

  // build msg
734
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
735 736 737 738
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
    return -1;
  }
739 740

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
741 742
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
743
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
744 745 746 747
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
748
  pRspMsg->code = code;
M
Minghao Li 已提交
749 750 751
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
S
Shengliang Guan 已提交
752 753 754 755 756 757
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
    return -1;
  }

758
  return code;
M
Minghao Li 已提交
759
}
M
Minghao Li 已提交
760

761 762
// receiver on message
//
763
// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT
M
Minghao Li 已提交
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779
//              if receiver already start
//                    if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
//                    if sender.start-time = receiver.start-time, maybe duplicate msg
//                    if sender.start-time < receiver.start-time, ignore
//              else
//                    waiting for clock match
//                    start receiver(reply snapshot start)
//
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
//              a. create writer with <begin, end>
//
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
//
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
//
// condition 5, got data, update ack
M
Minghao Li 已提交
780
//
781
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
782 783
  SyncSnapshotSend      *pMsg = pRpcMsg->pCont;
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
784

M
Minghao Li 已提交
785
  // if already drop replica, do not process
S
Shengliang Guan 已提交
786
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
787
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
788 789
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
790 791
  }

792
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
793
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
794 795
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
796 797
  }

798
  if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
M
Minghao Li 已提交
799 800 801 802
    syncNodeStepDown(pSyncNode, pMsg->term);
  }
  syncNodeResetElectTimer(pSyncNode);

803
  // state, term, seq/ack
804
  int32_t code = 0;
805
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
806
    if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
807
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
S
Shengliang Guan 已提交
808
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
809
        code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
M
Minghao Li 已提交
810
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
S
Shengliang Guan 已提交
811
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
812
        code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
M
Minghao Li 已提交
813
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
814
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
815
        code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
S
Shengliang Guan 已提交
816 817
        if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
          sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
818
          code = -1;
S
Shengliang Guan 已提交
819
        }
M
Minghao Li 已提交
820
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
M
Minghao Li 已提交
821
        // force close, no response
S
Shengliang Guan 已提交
822
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
823
        snapshotReceiverStop(pReceiver);
M
Minghao Li 已提交
824
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
825
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
826
        code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
M
Minghao Li 已提交
827
      } else {
M
Minghao Li 已提交
828
        // error log
S
Shengliang Guan 已提交
829
        sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
830
        code = -1;
831
      }
832 833
    } else {
      // error log
S
Shengliang Guan 已提交
834
      sRError(pReceiver, "snapshot receiver term not equal");
835
      code = -1;
M
Minghao Li 已提交
836
    }
M
Minghao Li 已提交
837
  } else {
838
    // error log
S
Shengliang Guan 已提交
839
    sRError(pReceiver, "snapshot receiver not follower");
840
    code = -1;
M
Minghao Li 已提交
841
  }
M
Minghao Li 已提交
842

843
  return code;
M
Minghao Li 已提交
844 845
}

846
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
S
Shengliang Guan 已提交
847
  SSnapshot snapshot = {0};
M
Minghao Li 已提交
848 849 850 851 852 853
  pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);

  // prepare <begin, end>
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
  pSender->snapshotParam.end = snapshot.lastApplyIndex;

S
Shengliang Guan 已提交
854 855
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
M
Minghao Li 已提交
856

M
Minghao Li 已提交
857
  if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
858 859
    sSError(pSender, "prepare snapshot failed since beginIndex:%" PRId64 " larger than applyIndex:%" PRId64,
            pMsg->snapBeginIndex, snapshot.lastApplyIndex);
S
Shengliang Guan 已提交
860
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
861 862 863
    return -1;
  }

M
Minghao Li 已提交
864 865 866
  // update sender
  pSender->snapshot = snapshot;

M
Minghao Li 已提交
867
  // start reader
868
  int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
M
Minghao Li 已提交
869
  if (code != 0) {
S
Shengliang Guan 已提交
870
    sSError(pSender, "prepare snapshot failed since %s", terrstr());
M
Minghao Li 已提交
871 872 873
    return -1;
  }

M
Minghao Li 已提交
874
  // update next index
875
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
M
Minghao Li 已提交
876

M
Minghao Li 已提交
877 878 879
  // update seq
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

M
Minghao Li 已提交
880
  // build begin msg
881
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
882 883 884 885
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "prepare snapshot failed since build msg error");
    return -1;
  }
886 887

  SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
M
Minghao Li 已提交
888
  pSendMsg->srcId = pSender->pSyncNode->myRaftId;
889
  pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
890
  pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode);
M
Minghao Li 已提交
891 892 893 894 895 896 897 898 899
  pSendMsg->beginIndex = pSender->snapshotParam.start;
  pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pSendMsg->lastConfig = pSender->lastConfig;
  pSendMsg->startTime = pSender->startTime;
  pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

  // send msg
S
Shengliang Guan 已提交
900 901 902 903 904
  syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
  if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "prepare snapshot failed since send msg error");
    return -1;
  }
M
Minghao Li 已提交
905 906 907 908

  return 0;
}

909 910 911 912 913 914
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
S
Shengliang Guan 已提交
915
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
916 917
  SyncSnapshotRsp *pMsg = pRpcMsg->pCont;

918
  // if already drop replica, do not process
919
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
920
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
921
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
922
    return -1;
923 924
  }

M
Minghao Li 已提交
925
  // get sender
S
Shengliang Guan 已提交
926 927 928 929 930 931
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
  if (pSender == NULL) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
932 933

  // state, term, seq/ack
934
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
935
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
936
    sSError(pSender, "snapshot sender not leader");
937
    terrno = TSDB_CODE_SYN_NOT_LEADER;
938 939 940 941 942
    goto _ERROR;
  }

  if (pMsg->startTime != pSender->startTime) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
943 944
    sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, error:%s 0x%x", pMsg->startTime,
            pSender->startTime, tstrerror(pMsg->code), pMsg->code);
945
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
946
    goto _ERROR;
947
  }
M
Minghao Li 已提交
948

949 950
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
  if (pMsg->term != currentTerm) {
951 952
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
    sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
953
            currentTerm);
954
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
955
    goto _ERROR;
956
  }
S
Shengliang Guan 已提交
957

958 959
  if (pMsg->code != 0) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
960
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
961
    terrno = pMsg->code;
962
    goto _ERROR;
963
  }
M
Minghao Li 已提交
964

965
  // prepare <begin, end>, send begin msg
966
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
967
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
968
    return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg);
969
  }
970

971 972
  if (pSender->pReader == NULL || pSender->finish) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
973 974
    sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code,
            pSender->pReader, pSender->finish);
975 976 977 978
    terrno = pMsg->code;
    goto _ERROR;
  }

979 980 981 982 983
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
      return -1;
    }
984

985 986 987 988 989 990 991 992 993 994 995
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
    return 0;
  }

  // receive ack is finish, close sender
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
996
    syncLogReplMgrReset(pMgr);
997 998 999 1000 1001 1002 1003 1004
    return 0;
  }

  // send next msg
  if (pMsg->ack == pSender->seq) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
    // update sender ack
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
1005
      return -1;
1006
    }
1007 1008 1009 1010 1011 1012
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
  } else if (pMsg->ack == pSender->seq - 1) {
    // maybe resend
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
1013 1014 1015
    if (snapshotReSend(pSender) != 0) {
      return -1;
    }
M
Minghao Li 已提交
1016
  } else {
1017
    // error log
1018 1019 1020 1021
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
    sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1022
    syncLogReplMgrReset(pMgr);
1023
    return -1;
1024 1025 1026
  }

  return 0;
1027 1028 1029 1030

_ERROR:
  snapshotSenderStop(pSender, true);
  SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1031
  syncLogReplMgrReset(pMgr);
1032 1033

  return -1;
1034
}