syncSnapshot.c 35.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21
#include "syncRaftLog.h"
M
Minghao Li 已提交
22
#include "syncRaftStore.h"
M
Minghao Li 已提交
23
#include "syncReplication.h"
M
Minghao Li 已提交
24
#include "syncUtil.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
S
Shengliang Guan 已提交
29
  if (!condition) return NULL;
M
Minghao Li 已提交
30

S
Shengliang Guan 已提交
31 32 33 34
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
  if (pSender == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
35
  }
36

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45
  pSender->start = false;
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->pSyncNode = pSyncNode;
  pSender->replicaIndex = replicaIndex;
S
Shengliang Guan 已提交
46
  pSender->term = pSyncNode->raftStore.currentTerm;
S
Shengliang Guan 已提交
47 48 49 50 51
  pSender->startTime = 0;
  pSender->endTime = 0;
  pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
  pSender->finish = false;

M
Minghao Li 已提交
52 53
  return pSender;
}
M
Minghao Li 已提交
54

M
Minghao Li 已提交
55
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
S
Shengliang Guan 已提交
56
  if (pSender == NULL) return;
57

S
Shengliang Guan 已提交
58 59 60 61 62
  // free current block
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
    pSender->pCurrentBlock = NULL;
  }
63

S
Shengliang Guan 已提交
64 65 66 67
  // close reader
  if (pSender->pReader != NULL) {
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    pSender->pReader = NULL;
M
Minghao Li 已提交
68
  }
S
Shengliang Guan 已提交
69 70 71

  // free sender
  taosMemoryFree(pSender);
M
Minghao Li 已提交
72
}
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
76
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
77 78 79
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
80 81 82 83 84 85 86 87 88 89
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.data = NULL;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
90

91
  memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
M
Minghao Li 已提交
92
  pSender->sendingMS = 0;
S
Shengliang Guan 已提交
93
  pSender->term = pSender->pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
94
  pSender->startTime = taosGetTimestampMs();
95
  pSender->lastSendTime = pSender->startTime;
M
Minghao Li 已提交
96
  pSender->finish = false;
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98
  // build begin msg
99
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
100 101 102 103
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
104 105

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
106
  pMsg->srcId = pSender->pSyncNode->myRaftId;
107
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
S
Shengliang Guan 已提交
108
  pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
109
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
110 111
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
112 113
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
114
  pMsg->startTime = pSender->startTime;
115
  pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
M
Minghao Li 已提交
116

117 118 119
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");

M
Minghao Li 已提交
120
  // send msg
S
Shengliang Guan 已提交
121 122 123 124
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }
125

126
  return 0;
M
Minghao Li 已提交
127 128
}

129
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
S
Shengliang Guan 已提交
130 131
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);

M
Minghao Li 已提交
132 133 134
  // update flag
  pSender->start = false;
  pSender->finish = finish;
M
Minghao Li 已提交
135
  pSender->endTime = taosGetTimestampMs();
M
Minghao Li 已提交
136

137
  // close reader
M
Minghao Li 已提交
138
  if (pSender->pReader != NULL) {
139
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
M
Minghao Li 已提交
140 141
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
142

143
  // free current block
M
Minghao Li 已提交
144 145
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
146
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
147 148 149 150
    pSender->blockLen = 0;
  }
}

151
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
152
// seq = ack + 1, already updated
153
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
154
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
155 156
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
157
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
158 159 160 161 162
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
S
Shengliang Guan 已提交
163 164 165 166 167 168
                                                           &pSender->pCurrentBlock, &pSender->blockLen);
  if (ret != 0) {
    sSError(pSender, "snapshot sender read failed since %s", terrstr());
    return -1;
  }

M
Minghao Li 已提交
169 170
  if (pSender->blockLen > 0) {
    // has read data
171
    sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
172
  } else {
173
    // read finish, update seq to end
M
Minghao Li 已提交
174
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
S
Shengliang Guan 已提交
175
    sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
M
Minghao Li 已提交
176
  }
M
Minghao Li 已提交
177

M
Minghao Li 已提交
178
  // build msg
179
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
180 181 182 183
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
    return -1;
  }
184 185

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
186
  pMsg->srcId = pSender->pSyncNode->myRaftId;
187
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
S
Shengliang Guan 已提交
188
  pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
189
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
190 191
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
192 193
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
194
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
195

M
Minghao Li 已提交
196 197 198
  if (pSender->pCurrentBlock != NULL) {
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
  }
M
Minghao Li 已提交
199

200
  // event log
S
Shengliang Guan 已提交
201
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
202
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
S
Shengliang Guan 已提交
203
  } else {
S
Shengliang Guan 已提交
204
    syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
S
Shengliang Guan 已提交
205
  }
206 207 208 209 210 211 212 213

  // send msg
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
    return -1;
  }

  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
214 215 216
  return 0;
}

M
Minghao Li 已提交
217 218
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
219 220
  // build msg
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
221 222 223 224
  if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
    return -1;
  }
225 226 227

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
  pMsg->srcId = pSender->pSyncNode->myRaftId;
228
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
S
Shengliang Guan 已提交
229
  pMsg->term = pSender->pSyncNode->raftStore.currentTerm;
230 231 232 233 234 235 236 237
  pMsg->beginIndex = pSender->snapshotParam.start;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
  pMsg->seq = pSender->seq;

  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
M
Minghao Li 已提交
238
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
239
  }
M
Minghao Li 已提交
240

241 242 243
  // event log
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");

244
  // send msg
S
Shengliang Guan 已提交
245 246 247 248
  if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
    return -1;
  }
249

250
  pSender->lastSendTime = taosGetTimestampMs();
M
Minghao Li 已提交
251 252 253
  return 0;
}

S
Shengliang Guan 已提交
254 255 256 257 258 259 260
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  if (pMsg->ack != pSender->seq) {
    sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

261
  pSender->ack = pMsg->ack;
S
Shengliang Guan 已提交
262 263 264 265
  pSender->seq++;

  sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
  return 0;
266 267
}

M
Minghao Li 已提交
268 269 270
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
271
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
S
Shengliang Guan 已提交
272
  sNInfo(pSyncNode, "snapshot sender starting ...");
M
Minghao Li 已提交
273

274 275
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
  if (pSender == NULL) {
S
Shengliang Guan 已提交
276
    sNError(pSyncNode, "snapshot sender start error since get failed");
M
Minghao Li 已提交
277
    return -1;
278 279
  }

M
Minghao Li 已提交
280
  if (snapshotSenderIsStart(pSender)) {
281
    sSInfo(pSender, "snapshot sender already start, ignore");
M
Minghao Li 已提交
282 283 284
    return 0;
  }

S
Shengliang Guan 已提交
285 286
  if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
    sSInfo(pSender, "snapshot sender start too frequently, ignore");
287
    return 0;
M
Minghao Li 已提交
288 289
  }

290
  sSInfo(pSender, "snapshot sender start");
291

S
Shengliang Guan 已提交
292
  int32_t code = snapshotSenderStart(pSender);
M
Minghao Li 已提交
293
  if (code != 0) {
S
Shengliang Guan 已提交
294
    sSError(pSender, "snapshot sender start error since %s", terrstr());
M
Minghao Li 已提交
295 296
    return -1;
  }
297 298 299

  return 0;
}
300

301
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
302 303
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
S
Shengliang Guan 已提交
304
  if (!condition) return NULL;
305

S
Shengliang Guan 已提交
306 307 308 309
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
  if (pReceiver == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
M
Minghao Li 已提交
310
  }
311

S
Shengliang Guan 已提交
312 313 314 315 316
  pReceiver->start = false;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
  pReceiver->pWriter = NULL;
  pReceiver->pSyncNode = pSyncNode;
  pReceiver->fromId = fromId;
S
Shengliang Guan 已提交
317
  pReceiver->term = pSyncNode->raftStore.currentTerm;
S
Shengliang Guan 已提交
318 319 320 321 322
  pReceiver->snapshot.data = NULL;
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pReceiver->snapshot.lastApplyTerm = 0;
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;

323 324
  return pReceiver;
}
M
Minghao Li 已提交
325

326
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
327
  if (pReceiver == NULL) return;
328

S
Shengliang Guan 已提交
329 330 331 332 333 334 335 336
  // close writer
  if (pReceiver->pWriter != NULL) {
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
    }
    pReceiver->pWriter = NULL;
337
  }
S
Shengliang Guan 已提交
338 339 340

  // free receiver
  taosMemoryFree(pReceiver);
341
}
M
Minghao Li 已提交
342

343 344
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

345
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
S
Shengliang Guan 已提交
346 347 348 349 350
  if (pReceiver->pWriter != NULL) {
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
M
Minghao Li 已提交
351 352 353 354 355 356 357 358 359 360 361 362

  // update ack
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  // update snapshot
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;

  // start writer
S
Shengliang Guan 已提交
363 364 365 366 367 368
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
                                                                 &pReceiver->pWriter);
  if (ret != 0) {
    sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
369 370

  // event log
S
Shengliang Guan 已提交
371
  sRInfo(pReceiver, "snapshot receiver start write");
M
Minghao Li 已提交
372 373 374
  return 0;
}

375
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
376
  if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
377
    sRInfo(pReceiver, "snapshot receiver has started");
378
    return;
379
  }
M
Minghao Li 已提交
380 381

  pReceiver->start = true;
382
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
S
Shengliang Guan 已提交
383
  pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
384 385 386 387
  pReceiver->fromId = pPreMsg->srcId;
  pReceiver->startTime = pPreMsg->startTime;

  // event log
S
Shengliang Guan 已提交
388
  sRInfo(pReceiver, "snapshot receiver is start");
389
}
M
Minghao Li 已提交
390

391
// just set start = false
S
Shengliang Guan 已提交
392
// FpSnapshotStopWrite should not be called
393
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
S
Shengliang Guan 已提交
394 395
  sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);

M
Minghao Li 已提交
396
  if (pReceiver->pWriter != NULL) {
397
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
S
Shengliang Guan 已提交
398 399 400 401
                                                                  &pReceiver->snapshot);
    if (ret != 0) {
      sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
    }
M
Minghao Li 已提交
402
    pReceiver->pWriter = NULL;
S
Shengliang Guan 已提交
403 404
  } else {
    sRInfo(pReceiver, "snapshot receiver stop, writer is null");
405
  }
M
Minghao Li 已提交
406 407

  pReceiver->start = false;
408 409
}

410
// when recv last snapshot block, apply data into snapshot
411 412
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  int32_t code = 0;
413
  if (pReceiver->pWriter != NULL) {
414
    // write data
S
Shengliang Guan 已提交
415
    sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
416 417 418
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
419
      if (code != 0) {
S
Shengliang Guan 已提交
420
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
421 422 423 424 425
        return -1;
      }
    }

    // reset wal
S
Shengliang Guan 已提交
426
    sRInfo(pReceiver, "snapshot receiver log restore");
427 428 429
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
S
Shengliang Guan 已提交
430
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
431
      return -1;
432 433
    }

434 435 436 437 438
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
439
    // maybe update term
S
Shengliang Guan 已提交
440 441 442
    if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->raftStore.currentTerm) {
      pReceiver->pSyncNode->raftStore.currentTerm = pReceiver->snapshot.lastApplyTerm;
      (void)raftStoreWriteFile(pReceiver->pSyncNode);
M
Minghao Li 已提交
443 444
    }

445
    // stop writer, apply data
S
Shengliang Guan 已提交
446
    sRInfo(pReceiver, "snapshot receiver apply write");
447
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
S
Shengliang Guan 已提交
448
                                                           &pReceiver->snapshot);
449
    if (code != 0) {
S
Shengliang Guan 已提交
450
      sRError(pReceiver, "snapshot receiver apply failed  since %s", terrstr());
451 452
      return -1;
    }
453 454
    pReceiver->pWriter = NULL;

455 456
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
457

458
  } else {
S
Shengliang Guan 已提交
459
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
460
    return -1;
461 462 463
  }

  // event log
S
Shengliang Guan 已提交
464
  sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
465
  return 0;
466 467
}

468 469
// apply data block
// update progress
S
Shengliang Guan 已提交
470 471 472
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  if (pMsg->seq != pReceiver->ack + 1) {
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
473
    terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
S
Shengliang Guan 已提交
474 475
    return -1;
  }
476

S
Shengliang Guan 已提交
477 478 479 480 481
  if (pReceiver->pWriter == NULL) {
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
482

S
Shengliang Guan 已提交
483
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
484

S
Shengliang Guan 已提交
485 486 487 488 489 490 491 492
  if (pMsg->dataLen > 0) {
    // apply data block
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                 pMsg->data, pMsg->dataLen);
    if (code != 0) {
      sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
      return -1;
    }
493
  }
S
Shengliang Guan 已提交
494 495 496 497 498 499 500

  // update progress
  pReceiver->ack = pMsg->seq;

  // event log
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
  return 0;
501 502
}

M
Minghao Li 已提交
503 504 505 506 507
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
  SyncIndex snapStart = SYNC_INDEX_INVALID;

  if (syncNodeIsMnode(ths)) {
    snapStart = SYNC_INDEX_BEGIN;
S
Shengliang Guan 已提交
508
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
M
Minghao Li 已提交
509 510 511 512 513
  } else {
    SSyncLogStoreData *pData = ths->pLogStore->data;
    SWal              *pWal = pData->pWal;

    int64_t walCommitVer = walGetCommittedVer(pWal);
514
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
S
Shengliang Guan 已提交
515 516

    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
M
Minghao Li 已提交
517 518 519 520 521
  }

  return snapStart;
}

522
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
523
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
S
Shengliang Guan 已提交
524
  int64_t                timeNow = taosGetTimestampMs();
525
  int32_t                code = 0;
M
Minghao Li 已提交
526 527 528 529

  if (snapshotReceiverIsStart(pReceiver)) {
    // already start
    if (pMsg->startTime > pReceiver->startTime) {
S
Shengliang Guan 已提交
530 531
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
532 533
      goto _START_RECEIVER;
    } else if (pMsg->startTime == pReceiver->startTime) {
S
Shengliang Guan 已提交
534 535
      sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
             pReceiver->startTime, pMsg->startTime);
M
Minghao Li 已提交
536 537 538
      goto _SEND_REPLY;
    } else {
      // ignore
539 540 541 542 543
      sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
              pReceiver->startTime, pMsg->startTime);
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
      code = terrno;
      goto _SEND_REPLY;
M
Minghao Li 已提交
544 545 546
    }
  } else {
    // start new
S
Shengliang Guan 已提交
547
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
M
Minghao Li 已提交
548 549 550 551
    goto _START_RECEIVER;
  }

_START_RECEIVER:
S
Shengliang Guan 已提交
552 553 554
  if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
    sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
            pMsg->startTime);
555 556
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    code = terrno;
M
Minghao Li 已提交
557 558
  } else {
    // waiting for clock match
M
Minghao Li 已提交
559
    while (timeNow < pMsg->startTime) {
560
      sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", startTime:%" PRId64, timeNow,
S
Shengliang Guan 已提交
561
             pMsg->startTime);
M
Minghao Li 已提交
562
      taosMsleep(10);
563
      timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
564 565
    }

566
    if (snapshotReceiverIsStart(pReceiver)) {
S
Shengliang Guan 已提交
567
      sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
568
      snapshotReceiverStop(pReceiver);
569 570
    }

M
Minghao Li 已提交
571 572 573 574 575 576
    snapshotReceiverStart(pReceiver, pMsg);  // set start-time same with sender
  }

_SEND_REPLY:
    // build msg
    ;  // make complier happy
577 578

  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
579 580 581 582
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }
583 584

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
585 586
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
S
Shengliang Guan 已提交
587
  pRspMsg->term = pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
588 589 590 591
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pMsg->seq;  // receiver maybe already closed
592
  pRspMsg->code = code;
M
Minghao Li 已提交
593 594 595
  pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);

  // send msg
S
Shengliang Guan 已提交
596 597 598 599 600 601
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
    return -1;
  }

602
  return code;
M
Minghao Li 已提交
603 604 605 606 607
}

static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 1
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
608
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
609

M
Minghao Li 已提交
610
  if (!snapshotReceiverIsStart(pReceiver)) {
611 612
    sRError(pReceiver, "snapshot receiver begin failed since not start");
    goto _SEND_REPLY;
M
Minghao Li 已提交
613 614
  }

M
Minghao Li 已提交
615
  if (pReceiver->startTime != pMsg->startTime) {
616
    sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
S
Shengliang Guan 已提交
617
            pReceiver->startTime, pMsg->startTime);
618
    goto _SEND_REPLY;
M
Minghao Li 已提交
619
  }
M
Minghao Li 已提交
620

M
Minghao Li 已提交
621
  // start writer
622 623 624 625 626 627 628 629 630 631
  if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) {
    sRError(pReceiver, "snapshot receiver begin failed since start writer failed");
    goto _SEND_REPLY;
  }

  code = 0;
_SEND_REPLY:
  if (code != 0 && terrno != 0) {
    code = terrno;
  }
M
Minghao Li 已提交
632

M
Minghao Li 已提交
633
  // build msg
634
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
635 636 637 638
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
639 640

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
641 642
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
S
Shengliang Guan 已提交
643
  pRspMsg->term = pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
644 645 646 647
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
648
  pRspMsg->code = code;
M
Minghao Li 已提交
649
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
M
Minghao Li 已提交
650

M
Minghao Li 已提交
651
  // send msg
S
Shengliang Guan 已提交
652 653 654 655 656 657
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }

658
  return code;
M
Minghao Li 已提交
659 660
}

661
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
662 663 664 665 666 667 668
  // condition 4
  // transfering
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
669 670
    sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
671
    taosMsleep(10);
S
Shengliang Guan 已提交
672
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
673 674
  }

675
  int32_t code = 0;
S
Shengliang Guan 已提交
676
  if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
677 678 679 680
    code = terrno;
    if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
    }
M
Minghao Li 已提交
681 682
  }

M
Minghao Li 已提交
683
  // build msg
684
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
685 686 687 688
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) {
    sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
    return -1;
  }
689 690

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
691 692
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
S
Shengliang Guan 已提交
693
  pRspMsg->term = pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
694 695 696 697
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
698
  pRspMsg->code = code;
M
Minghao Li 已提交
699 700 701
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
702
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received");
S
Shengliang Guan 已提交
703 704 705 706
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
    return -1;
  }
707

708
  return code;
M
Minghao Li 已提交
709 710
}

M
Minghao Li 已提交
711 712 713 714 715 716 717 718
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 2
  // end, finish FSM
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
S
Shengliang Guan 已提交
719 720
    sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
           pMsg->startTime);
M
Minghao Li 已提交
721
    taosMsleep(10);
M
Minghao Li 已提交
722
    timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
723 724 725 726 727 728 729 730
  }

  int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
  if (code == 0) {
    snapshotReceiverStop(pReceiver);
  }

  // build msg
731
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
732 733 734 735
  if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
    return -1;
  }
736 737

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
738 739
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
S
Shengliang Guan 已提交
740
  pRspMsg->term = pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
741 742 743 744
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
745
  pRspMsg->code = code;
M
Minghao Li 已提交
746 747 748
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
S
Shengliang Guan 已提交
749 750 751 752 753 754
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
  if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
    return -1;
  }

755
  return code;
M
Minghao Li 已提交
756
}
M
Minghao Li 已提交
757

758 759
// receiver on message
//
760
// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT
M
Minghao Li 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
//              if receiver already start
//                    if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
//                    if sender.start-time = receiver.start-time, maybe duplicate msg
//                    if sender.start-time < receiver.start-time, ignore
//              else
//                    waiting for clock match
//                    start receiver(reply snapshot start)
//
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
//              a. create writer with <begin, end>
//
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
//
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
//
// condition 5, got data, update ack
M
Minghao Li 已提交
777
//
778
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
779 780
  SyncSnapshotSend      *pMsg = pRpcMsg->pCont;
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
781

M
Minghao Li 已提交
782
  // if already drop replica, do not process
S
Shengliang Guan 已提交
783
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
784
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
785 786
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
787 788
  }

S
Shengliang Guan 已提交
789
  if (pMsg->term < pSyncNode->raftStore.currentTerm) {
S
Shengliang Guan 已提交
790
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
791 792
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
793 794
  }

S
Shengliang Guan 已提交
795
  if (pMsg->term > pSyncNode->raftStore.currentTerm) {
M
Minghao Li 已提交
796 797 798 799
    syncNodeStepDown(pSyncNode, pMsg->term);
  }
  syncNodeResetElectTimer(pSyncNode);

800
  // state, term, seq/ack
801
  int32_t code = 0;
802
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
803
    if (pMsg->term == pSyncNode->raftStore.currentTerm) {
804
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
S
Shengliang Guan 已提交
805
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
806
        code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
M
Minghao Li 已提交
807
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
S
Shengliang Guan 已提交
808
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
809
        code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
M
Minghao Li 已提交
810
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
S
Shengliang Guan 已提交
811
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
812
        code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
S
Shengliang Guan 已提交
813 814
        if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
          sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
815
          code = -1;
S
Shengliang Guan 已提交
816
        }
M
Minghao Li 已提交
817
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
M
Minghao Li 已提交
818
        // force close, no response
S
Shengliang Guan 已提交
819
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
820
        snapshotReceiverStop(pReceiver);
M
Minghao Li 已提交
821
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
822
        syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
823
        code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
M
Minghao Li 已提交
824
      } else {
M
Minghao Li 已提交
825
        // error log
S
Shengliang Guan 已提交
826
        sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
827
        code = -1;
828
      }
829 830
    } else {
      // error log
S
Shengliang Guan 已提交
831
      sRError(pReceiver, "snapshot receiver term not equal");
832
      code = -1;
M
Minghao Li 已提交
833
    }
M
Minghao Li 已提交
834
  } else {
835
    // error log
S
Shengliang Guan 已提交
836
    sRError(pReceiver, "snapshot receiver not follower");
837
    code = -1;
M
Minghao Li 已提交
838
  }
M
Minghao Li 已提交
839

840
  return code;
M
Minghao Li 已提交
841 842
}

843
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
S
Shengliang Guan 已提交
844
  SSnapshot snapshot = {0};
M
Minghao Li 已提交
845 846 847 848 849 850
  pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);

  // prepare <begin, end>
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
  pSender->snapshotParam.end = snapshot.lastApplyIndex;

S
Shengliang Guan 已提交
851 852
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
M
Minghao Li 已提交
853

M
Minghao Li 已提交
854
  if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
S
Shengliang Guan 已提交
855 856 857
    sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
            snapshot.lastApplyIndex);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
858 859 860
    return -1;
  }

M
Minghao Li 已提交
861 862 863
  // update sender
  pSender->snapshot = snapshot;

M
Minghao Li 已提交
864
  // start reader
865
  int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
M
Minghao Li 已提交
866
  if (code != 0) {
S
Shengliang Guan 已提交
867
    sSError(pSender, "prepare snapshot failed since %s", terrstr());
M
Minghao Li 已提交
868 869 870
    return -1;
  }

M
Minghao Li 已提交
871
  // update next index
872
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
M
Minghao Li 已提交
873

M
Minghao Li 已提交
874 875 876
  // update seq
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

M
Minghao Li 已提交
877
  // build begin msg
878
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
879 880 881 882
  if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
    sSError(pSender, "prepare snapshot failed since build msg error");
    return -1;
  }
883 884

  SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
M
Minghao Li 已提交
885
  pSendMsg->srcId = pSender->pSyncNode->myRaftId;
886
  pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
S
Shengliang Guan 已提交
887
  pSendMsg->term = pSender->pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
888 889 890 891 892 893 894 895 896
  pSendMsg->beginIndex = pSender->snapshotParam.start;
  pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pSendMsg->lastConfig = pSender->lastConfig;
  pSendMsg->startTime = pSender->startTime;
  pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

  // send msg
S
Shengliang Guan 已提交
897 898 899 900 901
  syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
  if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
    sSError(pSender, "prepare snapshot failed since send msg error");
    return -1;
  }
M
Minghao Li 已提交
902 903 904 905

  return 0;
}

906 907 908 909 910 911
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
S
Shengliang Guan 已提交
912
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
913 914
  SyncSnapshotRsp *pMsg = pRpcMsg->pCont;

915
  // if already drop replica, do not process
916
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
M
Minghao Li 已提交
917
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
918
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
919
    return -1;
920 921
  }

M
Minghao Li 已提交
922
  // get sender
S
Shengliang Guan 已提交
923 924 925 926 927 928
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
  if (pSender == NULL) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
929 930

  // state, term, seq/ack
931
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
932
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
933
    sSError(pSender, "snapshot sender not leader");
934
    terrno = TSDB_CODE_SYN_NOT_LEADER;
935 936 937 938 939
    goto _ERROR;
  }

  if (pMsg->startTime != pSender->startTime) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
940 941
    sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, error:%s 0x%x", pMsg->startTime,
            pSender->startTime, tstrerror(pMsg->code), pMsg->code);
942
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
943
    goto _ERROR;
944
  }
M
Minghao Li 已提交
945

S
Shengliang Guan 已提交
946
  if (pMsg->term != pSyncNode->raftStore.currentTerm) {
947 948
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
    sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
S
Shengliang Guan 已提交
949
            pSyncNode->raftStore.currentTerm);
950
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
951
    goto _ERROR;
952
  }
S
Shengliang Guan 已提交
953

954 955
  if (pMsg->code != 0) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
956
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
957
    terrno = pMsg->code;
958
    goto _ERROR;
959
  }
M
Minghao Li 已提交
960

961
  // prepare <begin, end>, send begin msg
962
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
963
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
964
    return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg);
965
  }
966

967 968 969 970 971 972 973
  if (pSender->pReader == NULL || pSender->finish) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
    sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
    terrno = pMsg->code;
    goto _ERROR;
  }

974 975 976 977 978
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
      return -1;
    }
979

980 981 982 983 984 985 986 987 988 989 990
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
    return 0;
  }

  // receive ack is finish, close sender
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
991
    syncLogReplMgrReset(pMgr);
992 993 994 995 996 997 998 999
    return 0;
  }

  // send next msg
  if (pMsg->ack == pSender->seq) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
    // update sender ack
    if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
1000
      return -1;
1001
    }
1002 1003 1004 1005 1006 1007
    if (snapshotSend(pSender) != 0) {
      return -1;
    }
  } else if (pMsg->ack == pSender->seq - 1) {
    // maybe resend
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
1008 1009 1010
    if (snapshotReSend(pSender) != 0) {
      return -1;
    }
M
Minghao Li 已提交
1011
  } else {
1012
    // error log
1013 1014 1015 1016
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
    sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
    snapshotSenderStop(pSender, true);
    SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1017
    syncLogReplMgrReset(pMgr);
1018
    return -1;
1019 1020 1021
  }

  return 0;
1022 1023 1024 1025

_ERROR:
  snapshotSenderStop(pSender, true);
  SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
1026
  syncLogReplMgrReset(pMgr);
1027 1028

  return -1;
1029
}