syncSnapshot.c 29.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21
#include "syncRaftLog.h"
M
Minghao Li 已提交
22
#include "syncRaftStore.h"
M
Minghao Li 已提交
23
#include "syncReplication.h"
M
Minghao Li 已提交
24
#include "syncUtil.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
29

M
Minghao Li 已提交
30 31
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
32
    pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
33
    if (pSender == NULL) {
34 35
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
36
    }
M
Minghao Li 已提交
37 38 39 40 41 42 43 44 45 46 47

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
48
    pSender->startTime = 0;
M
Minghao Li 已提交
49
    pSender->endTime = 0;
50
    pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
51
    pSender->finish = false;
M
Minghao Li 已提交
52
  } else {
53
    sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
M
Minghao Li 已提交
54
  }
55

M
Minghao Li 已提交
56 57
  return pSender;
}
M
Minghao Li 已提交
58

M
Minghao Li 已提交
59 60
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
61
    // free current block
M
Minghao Li 已提交
62 63
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
64
      pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
65
    }
66 67 68 69

    // close reader
    if (pSender->pReader != NULL) {
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
70
      if (ret != 0) {
S
Shengliang Guan 已提交
71
        sNError(pSender->pSyncNode, "stop reader error");
72
      }
73 74 75 76
      pSender->pReader = NULL;
    }

    // free sender
M
Minghao Li 已提交
77 78 79
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
80

M
Minghao Li 已提交
81 82
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
83
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
84
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
85

86 87 88
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
89 90 91
  pSender->pReader = NULL;
  pSender->pCurrentBlock = NULL;
  pSender->blockLen = 0;
92

M
Minghao Li 已提交
93 94
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
95

M
Minghao Li 已提交
96 97 98 99 100
  pSender->snapshot.data = NULL;
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
101

M
Minghao Li 已提交
102 103 104 105
  memset(&(pSender->lastConfig), 0, sizeof(pSender->lastConfig));
  pSender->sendingMS = 0;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSender->startTime = taosGetTimestampMs();
106
  pSender->lastSendTime = pSender->startTime;
M
Minghao Li 已提交
107
  pSender->finish = false;
M
Minghao Li 已提交
108

M
Minghao Li 已提交
109
  // build begin msg
110 111 112 113
  SRpcMsg rpcMsg = {0};
  (void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId);

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
114 115 116
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
117
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
118 119
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
120 121
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
122 123
  pMsg->startTime = pSender->startTime;
  pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
M
Minghao Li 已提交
124

M
Minghao Li 已提交
125
  // send msg
126
  syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
127
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
128 129

  // event log
S
Shengliang Guan 已提交
130
  sSTrace(pSender, "snapshot sender start");
131
  return 0;
M
Minghao Li 已提交
132 133
}

134
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
M
Minghao Li 已提交
135 136 137
  // update flag
  pSender->start = false;
  pSender->finish = finish;
M
Minghao Li 已提交
138
  pSender->endTime = taosGetTimestampMs();
M
Minghao Li 已提交
139

140
  // close reader
M
Minghao Li 已提交
141 142
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
143
    ASSERT(ret == 0);
M
Minghao Li 已提交
144 145
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
146

147
  // free current block
M
Minghao Li 已提交
148 149
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
150
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
151 152
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
153

154
  // event log
S
Shengliang Guan 已提交
155
  sSTrace(pSender, "snapshot sender stop");
156
  return 0;
M
Minghao Li 已提交
157 158
}

159
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
160
// seq = ack + 1, already updated
M
Minghao Li 已提交
161
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
162
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
163 164
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
165
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
166 167 168 169 170 171
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
172
  ASSERT(ret == 0);
M
Minghao Li 已提交
173 174 175
  if (pSender->blockLen > 0) {
    // has read data
  } else {
176
    // read finish, update seq to end
M
Minghao Li 已提交
177 178
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
179

M
Minghao Li 已提交
180
  // build msg
181 182 183 184
  SRpcMsg rpcMsg = {0};
  (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
185 186 187
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
188
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
189 190
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
191 192
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
193
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
194 195 196

  // pMsg->privateTerm = pSender->privateTerm;

M
Minghao Li 已提交
197 198 199
  if (pSender->pCurrentBlock != NULL) {
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
  }
M
Minghao Li 已提交
200

M
Minghao Li 已提交
201
  // send msg
202
  syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
203
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
M
Minghao Li 已提交
204

205 206
  pSender->lastSendTime = taosGetTimestampMs();

207
  // event log
S
Shengliang Guan 已提交
208 209 210 211 212
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
    sSTrace(pSender, "snapshot sender finish");
  } else {
    sSTrace(pSender, "snapshot sender sending");
  }
M
Minghao Li 已提交
213 214 215
  return 0;
}

M
Minghao Li 已提交
216 217
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
218
  // send current block data
M
Minghao Li 已提交
219

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
  // build msg
  SRpcMsg rpcMsg = {0};
  (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);

  SyncSnapshotSend *pMsg = rpcMsg.pCont;
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->beginIndex = pSender->snapshotParam.start;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
  pMsg->seq = pSender->seq;

  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
M
Minghao Li 已提交
236
    //  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
237
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
238
  }
M
Minghao Li 已提交
239

240 241 242
  // send msg
  syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
  syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
243

244 245 246 247
  pSender->lastSendTime = taosGetTimestampMs();

  // event log
  sSTrace(pSender, "snapshot sender resend");
248

M
Minghao Li 已提交
249 250 251
  return 0;
}

252
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
253
  ASSERT(pMsg->ack == pSender->seq);
254 255 256 257
  pSender->ack = pMsg->ack;
  ++(pSender->seq);
}

M
Minghao Li 已提交
258 259 260
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
261
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
S
Shengliang Guan 已提交
262
  sNTrace(pSyncNode, "starting snapshot ...");
M
Minghao Li 已提交
263

264 265
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
  if (pSender == NULL) {
S
Shengliang Guan 已提交
266
    sNError(pSyncNode, "start snapshot error, sender is null");
M
Minghao Li 已提交
267
    return -1;
268 269
  }

M
Minghao Li 已提交
270 271 272
  int32_t code = 0;

  if (snapshotSenderIsStart(pSender)) {
M
Minghao Li 已提交
273 274 275 276 277
    sNTrace(pSyncNode, "snapshot sender already start, ignore");
    return 0;
  }

  if (!snapshotSenderIsStart(pSender) && pSender->finish &&
M
Minghao Li 已提交
278
      taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
M
Minghao Li 已提交
279 280
    sNTrace(pSyncNode, "snapshot sender too frequently, ignore");
    return 1;
M
Minghao Li 已提交
281 282
  }

283 284 285 286 287
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
  sInfo("vgId:%d, start snapshot for peer: %s:%d", pSyncNode->vgId, host, port);

M
Minghao Li 已提交
288 289
  code = snapshotSenderStart(pSender);
  if (code != 0) {
S
Shengliang Guan 已提交
290
    sNError(pSyncNode, "snapshot sender start error");
M
Minghao Li 已提交
291 292
    return -1;
  }
293 294 295

  return 0;
}
296

297
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
298 299
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
300

301
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
302
  if (condition) {
303 304 305 306 307
    pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
    if (pReceiver == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
308

M
Minghao Li 已提交
309 310 311 312
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
313
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
314
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
315
    pReceiver->snapshot.data = NULL;
316
    pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
317
    pReceiver->snapshot.lastApplyTerm = 0;
318
    pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
319

M
Minghao Li 已提交
320
  } else {
321
    sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
M
Minghao Li 已提交
322
  }
323 324 325

  return pReceiver;
}
M
Minghao Li 已提交
326

327 328
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
329 330
    // close writer
    if (pReceiver->pWriter != NULL) {
331 332
      int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                    false, &(pReceiver->snapshot));
333
      ASSERT(ret == 0);
334 335 336 337
      pReceiver->pWriter = NULL;
    }

    // free receiver
338 339 340
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
341

342 343
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

344
// force stop
345
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
346 347
  // force close, abandon incomplete data
  if (pReceiver->pWriter != NULL) {
348 349
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &(pReceiver->snapshot));
350
    ASSERT(ret == 0);
351 352 353 354
    pReceiver->pWriter = NULL;
  }

  pReceiver->start = false;
355 356

  // event log
S
Shengliang Guan 已提交
357
  sRTrace(pReceiver, "snapshot receiver force stop");
358 359
}

M
Minghao Li 已提交
360
int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
361
  ASSERT(snapshotReceiverIsStart(pReceiver));
M
Minghao Li 已提交
362 363 364 365 366 367 368 369 370 371 372 373 374

  // update ack
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  // update snapshot
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;

  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;

  // start writer
375
  ASSERT(pReceiver->pWriter == NULL);
M
Minghao Li 已提交
376 377
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
                                                                 &(pReceiver->snapshotParam), &(pReceiver->pWriter));
378
  ASSERT(ret == 0);
M
Minghao Li 已提交
379 380 381 382 383 384 385 386

  // event log
  sRTrace(pReceiver, "snapshot receiver start writer");

  return 0;
}

int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
387 388 389 390
  if (snapshotReceiverIsStart(pReceiver)) {
    sWarn("vgId:%d, snapshot receiver has started.", pReceiver->pSyncNode->vgId);
    return 0;
  }
M
Minghao Li 已提交
391 392 393 394 395 396 397 398 399

  pReceiver->start = true;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
  pReceiver->fromId = pPreMsg->srcId;
  pReceiver->startTime = pPreMsg->startTime;

  // event log
  sRTrace(pReceiver, "snapshot receiver start");
400 401

  return 0;
402
}
M
Minghao Li 已提交
403

404 405
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
406
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
M
Minghao Li 已提交
407
  if (pReceiver->pWriter != NULL) {
408 409
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &(pReceiver->snapshot));
410
    ASSERT(ret == 0);
M
Minghao Li 已提交
411
    pReceiver->pWriter = NULL;
412
  }
M
Minghao Li 已提交
413 414

  pReceiver->start = false;
415

416
  // event log
S
Shengliang Guan 已提交
417
  sRTrace(pReceiver, "snapshot receiver stop");
418
  return 0;
419 420
}

421
// when recv last snapshot block, apply data into snapshot
422
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
423
  ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
424

425
  int32_t code = 0;
426
  if (pReceiver->pWriter != NULL) {
427
    // write data
428 429 430
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
431
      if (code != 0) {
S
Shengliang Guan 已提交
432
        sNError(pReceiver->pSyncNode, "snapshot write error");
433 434 435 436 437 438 439 440
        return -1;
      }
    }

    // reset wal
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
S
Shengliang Guan 已提交
441
      sNError(pReceiver->pSyncNode, "wal restore from snapshot error");
442
      return -1;
443 444
    }

445 446 447 448 449
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
450 451 452 453 454 455
    // maybe update term
    if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
      pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
      raftStorePersist(pReceiver->pSyncNode->pRaftStore);
    }

456
    // stop writer, apply data
457 458
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
                                                           &(pReceiver->snapshot));
459
    if (code != 0) {
S
Shengliang Guan 已提交
460
      sNError(pReceiver->pSyncNode, "snapshot stop writer true error");
461 462
      return -1;
    }
463 464
    pReceiver->pWriter = NULL;

465 466
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
467

468
  } else {
S
Shengliang Guan 已提交
469
    sNError(pReceiver->pSyncNode, "snapshot stop writer true error");
470
    return -1;
471 472 473
  }

  // event log
S
Shengliang Guan 已提交
474
  sRTrace(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
475
  return 0;
476 477
}

478 479
// apply data block
// update progress
480
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
481
  ASSERT(pMsg->seq == pReceiver->ack + 1);
482 483 484

  if (pReceiver->pWriter != NULL) {
    if (pMsg->dataLen > 0) {
485
      // apply data block
486 487
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                   pMsg->data, pMsg->dataLen);
488
      ASSERT(code == 0);
489
    }
490 491

    // update progress
492 493 494
    pReceiver->ack = pMsg->seq;

    // event log
S
Shengliang Guan 已提交
495
    sRTrace(pReceiver, "snapshot receiver receiving");
496
  }
497 498
}

M
Minghao Li 已提交
499 500 501 502 503 504 505 506 507 508 509 510 511 512
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
  SyncIndex snapStart = SYNC_INDEX_INVALID;

  if (syncNodeIsMnode(ths)) {
    snapStart = SYNC_INDEX_BEGIN;

  } else {
    SSyncLogStoreData *pData = ths->pLogStore->data;
    SWal              *pWal = pData->pWal;

    bool    isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
    int64_t walCommitVer = walGetCommittedVer(pWal);

    if (!isEmpty && ths->commitIndex != walCommitVer) {
S
Shengliang Guan 已提交
513 514
      sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer,
              ths->commitIndex);
M
Minghao Li 已提交
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
      snapStart = walCommitVer + 1;
    } else {
      snapStart = ths->commitIndex + 1;
    }
  }

  return snapStart;
}

static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  if (snapshotReceiverIsStart(pReceiver)) {
    // already start

    if (pMsg->startTime > pReceiver->startTime) {
      goto _START_RECEIVER;

    } else if (pMsg->startTime == pReceiver->startTime) {
      goto _SEND_REPLY;

    } else {
      // ignore
      return 0;
    }

  } else {
    // start new
    goto _START_RECEIVER;
  }

_START_RECEIVER:
  if (taosGetTimestampMs() - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
S
Shengliang Guan 已提交
548
    sNError(pSyncNode, "snapshot receiver time skew too much");
M
Minghao Li 已提交
549 550 551
    return -1;
  } else {
    // waiting for clock match
M
Minghao Li 已提交
552 553 554 555
    int64_t timeNow = taosGetTimestampMs();
    while (timeNow < pMsg->startTime) {
      sNTrace(pSyncNode, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
              pMsg->startTime);
M
Minghao Li 已提交
556
      taosMsleep(10);
557
      timeNow = taosGetTimestampMs();
M
Minghao Li 已提交
558 559
    }

560 561 562 563
    if (snapshotReceiverIsStart(pReceiver)) {
      snapshotReceiverForceStop(pReceiver);
    }

M
Minghao Li 已提交
564 565 566 567 568 569
    snapshotReceiverStart(pReceiver, pMsg);  // set start-time same with sender
  }

_SEND_REPLY:
    // build msg
    ;  // make complier happy
570 571 572 573 574

  SRpcMsg rpcMsg = {0};
  (void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
575 576 577 578 579 580 581 582 583 584 585
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pMsg->seq;  // receiver maybe already closed
  pRspMsg->code = 0;
  pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);

  // send msg
586
  syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
587
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
M
Minghao Li 已提交
588 589 590 591 592 593 594
  return 0;
}

static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 1
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

M
Minghao Li 已提交
595 596 597
  if (!snapshotReceiverIsStart(pReceiver)) {
    sNError(pSyncNode, "snapshot receiver not start");
    return -1;
M
Minghao Li 已提交
598 599
  }

M
Minghao Li 已提交
600 601
  if (pReceiver->startTime != pMsg->startTime) {
    sNError(pSyncNode, "snapshot receiver time not equal");
M
Minghao Li 已提交
602
    return -1;
M
Minghao Li 已提交
603
  }
M
Minghao Li 已提交
604

M
Minghao Li 已提交
605 606
  // start writer
  snapshotReceiverStartWriter(pReceiver, pMsg);
M
Minghao Li 已提交
607

M
Minghao Li 已提交
608
  // build msg
609 610 611 612
  SRpcMsg rpcMsg = {0};
  (void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
613 614 615 616 617 618 619 620 621
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
  pRspMsg->code = 0;
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
M
Minghao Li 已提交
622

M
Minghao Li 已提交
623
  // send msg
624
  syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
  return 0;
}

static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 4
  // transfering
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
    sNTrace(pSyncNode, "snapshot receiver transfering waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
            pMsg->startTime);
    taosMsleep(10);
M
Minghao Li 已提交
640 641
  }

M
Minghao Li 已提交
642 643
  if (pMsg->seq == pReceiver->ack + 1) {
    snapshotReceiverGotData(pReceiver, pMsg);
M
Minghao Li 已提交
644 645
  }

M
Minghao Li 已提交
646
  // build msg
647 648 649 650
  SRpcMsg rpcMsg = {0};
  (void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
651 652 653 654 655 656 657 658 659 660 661
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
  pRspMsg->code = 0;
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
662
  syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
663
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
M
Minghao Li 已提交
664 665 666
  return 0;
}

M
Minghao Li 已提交
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
  // condition 2
  // end, finish FSM
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;

  // waiting for clock match
  int64_t timeNow = taosGetTimestampMs();
  while (timeNow < pMsg->startTime) {
    sNTrace(pSyncNode, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
            pMsg->startTime);
    taosMsleep(10);
  }

  int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
  if (code == 0) {
    snapshotReceiverStop(pReceiver);
  }

  // build msg
686 687 688 689
  SRpcMsg rpcMsg = {0};
  (void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);

  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
M
Minghao Li 已提交
690 691 692 693 694 695 696 697 698 699 700
  pRspMsg->srcId = pSyncNode->myRaftId;
  pRspMsg->destId = pMsg->srcId;
  pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
  pRspMsg->lastIndex = pMsg->lastIndex;
  pRspMsg->lastTerm = pMsg->lastTerm;
  pRspMsg->startTime = pReceiver->startTime;
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
  pRspMsg->code = 0;
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;

  // send msg
701
  syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
702 703 704
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
  return 0;
}
M
Minghao Li 已提交
705

706 707
// receiver on message
//
M
Minghao Li 已提交
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
//              if receiver already start
//                    if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
//                    if sender.start-time = receiver.start-time, maybe duplicate msg
//                    if sender.start-time < receiver.start-time, ignore
//              else
//                    waiting for clock match
//                    start receiver(reply snapshot start)
//
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
//              a. create writer with <begin, end>
//
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
//
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
//
// condition 5, got data, update ack
M
Minghao Li 已提交
725
//
726 727 728
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
  SyncSnapshotSend *pMsg = pRpcMsg->pCont;

M
Minghao Li 已提交
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) {
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
    return 0;
  }

  if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject, small term");
    return 0;
  }

  if (pMsg->term > pSyncNode->pRaftStore->currentTerm) {
    syncNodeStepDown(pSyncNode, pMsg->term);
  }
  syncNodeResetElectTimer(pSyncNode);

745
  int32_t                code = 0;
M
Minghao Li 已提交
746
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
M
Minghao Li 已提交
747

M
Minghao Li 已提交
748 749
  syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "");

750 751
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
752
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
753 754 755 756 757
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
        syncNodeOnSnapshotPre(pSyncNode, pMsg);

      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        syncNodeOnSnapshotBegin(pSyncNode, pMsg);
M
Minghao Li 已提交
758 759

      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
M
Minghao Li 已提交
760
        syncNodeOnSnapshotEnd(pSyncNode, pMsg);
761
        (void)syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
762

M
Minghao Li 已提交
763
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
M
Minghao Li 已提交
764
        // force close, no response
765
        snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
766 767

      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
M
Minghao Li 已提交
768
        syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
M
Minghao Li 已提交
769 770

      } else {
M
Minghao Li 已提交
771
        // error log
S
Shengliang Guan 已提交
772
        sRTrace(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
M
Minghao Li 已提交
773
        return -1;
774
      }
M
Minghao Li 已提交
775

776 777
    } else {
      // error log
S
Shengliang Guan 已提交
778
      sRTrace(pReceiver, "snapshot receiver term not equal");
779
      return -1;
M
Minghao Li 已提交
780
    }
M
Minghao Li 已提交
781
  } else {
782
    // error log
S
Shengliang Guan 已提交
783
    sRTrace(pReceiver, "snapshot receiver not follower");
784
    return -1;
M
Minghao Li 已提交
785
  }
M
Minghao Li 已提交
786

M
Minghao Li 已提交
787 788 789
  return 0;
}

M
Minghao Li 已提交
790 791 792
int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
  // get sender
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
793
  ASSERT(pSender != NULL);
M
Minghao Li 已提交
794 795 796 797 798 799 800 801

  SSnapshot snapshot;
  pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);

  // prepare <begin, end>
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
  pSender->snapshotParam.end = snapshot.lastApplyIndex;

M
Minghao Li 已提交
802 803 804
  sNTrace(pSyncNode, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
          pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);

M
Minghao Li 已提交
805
  if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
S
Shengliang Guan 已提交
806
    sNError(pSyncNode, "snapshot last index too small");
M
Minghao Li 已提交
807 808 809
    return -1;
  }

M
Minghao Li 已提交
810 811 812
  // update sender
  pSender->snapshot = snapshot;

M
Minghao Li 已提交
813 814 815
  // start reader
  int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader));
  if (code != 0) {
S
Shengliang Guan 已提交
816
    sNError(pSyncNode, "create snapshot reader error");
M
Minghao Li 已提交
817 818 819
    return -1;
  }

M
Minghao Li 已提交
820
  // update next index
821
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
M
Minghao Li 已提交
822

M
Minghao Li 已提交
823 824 825
  // update seq
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

M
Minghao Li 已提交
826
  // build begin msg
827 828 829 830
  SRpcMsg rpcMsg = {0};
  (void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId);

  SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
M
Minghao Li 已提交
831 832 833 834 835 836 837 838 839 840 841 842
  pSendMsg->srcId = pSender->pSyncNode->myRaftId;
  pSendMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSendMsg->beginIndex = pSender->snapshotParam.start;
  pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pSendMsg->lastConfig = pSender->lastConfig;
  pSendMsg->startTime = pSender->startTime;
  pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;

  // send msg
843
  syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
844
  syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "");
M
Minghao Li 已提交
845 846 847 848

  return 0;
}

849 850 851 852 853 854
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
855 856 857
int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
  SyncSnapshotRsp *pMsg = pRpcMsg->pCont;

858
  // if already drop replica, do not process
M
Minghao Li 已提交
859 860
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
861
    return -1;
862 863
  }

M
Minghao Li 已提交
864
  // get sender
865
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
866
  ASSERT(pSender != NULL);
867

M
Minghao Li 已提交
868 869 870 871 872
  if (pMsg->startTime != pSender->startTime) {
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender/receiver start time not match");
    return -1;
  }

M
Minghao Li 已提交
873 874
  syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "");

875 876 877
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
878 879 880 881 882 883
      // prepare <begin, end>, send begin msg
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
        syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
        return 0;
      }

M
Minghao Li 已提交
884 885 886 887 888 889
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
        snapshotSenderUpdateProgress(pSender, pMsg);
        snapshotSend(pSender);
        return 0;
      }

890
      // receive ack is finish, close sender
M
Minghao Li 已提交
891
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
892
        snapshotSenderStop(pSender, true);
893
        SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
894
        if (pMgr) {
895
          syncLogReplMgrReset(pMgr);
896
        }
M
Minghao Li 已提交
897 898 899 900
        return 0;
      }

      // send next msg
901
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
902
        // update sender ack
903
        snapshotSenderUpdateProgress(pSender, pMsg);
M
Minghao Li 已提交
904
        snapshotSend(pSender);
905

M
Minghao Li 已提交
906
      } else if (pMsg->ack == pSender->seq - 1) {
907
        // maybe resend
M
Minghao Li 已提交
908
        snapshotReSend(pSender);
909

M
Minghao Li 已提交
910
      } else {
911
        // error log
S
Shengliang Guan 已提交
912
        sSError(pSender, "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
913
        return -1;
914
      }
915 916
    } else {
      // error log
S
Shengliang Guan 已提交
917
      sSError(pSender, "snapshot sender term not equal");
918
      return -1;
919
    }
M
Minghao Li 已提交
920
  } else {
921
    // error log
S
Shengliang Guan 已提交
922
    sSError(pSender, "snapshot sender not leader");
923
    return -1;
924 925 926
  }

  return 0;
927
}