syncSnapshot.c 28.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
//----------------------------------
M
Minghao Li 已提交
25 26
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
                                    SyncSnapshotSend *pBeginMsg);
27
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
M
Minghao Li 已提交
28

29
//----------------------------------
M
Minghao Li 已提交
30
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
31 32
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
33

M
Minghao Li 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
50
    pSender->privateTerm = taosGetTimestampMs() + 100;
51
    pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
52
    pSender->finish = false;
M
Minghao Li 已提交
53
  } else {
54
    sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
M
Minghao Li 已提交
55
  }
56

M
Minghao Li 已提交
57 58
  return pSender;
}
M
Minghao Li 已提交
59

M
Minghao Li 已提交
60 61
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
62
    // free current block
M
Minghao Li 已提交
63 64
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
65
      pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
66
    }
67 68 69 70 71 72 73 74 75

    // close reader
    if (pSender->pReader != NULL) {
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
      ASSERT(ret == 0);
      pSender->pReader = NULL;
    }

    // free sender
M
Minghao Li 已提交
76 77 78
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
79

M
Minghao Li 已提交
80 81
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

82
// begin send snapshot by snapshot, pReader
83
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) {
M
Minghao Li 已提交
84
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
85

86
  // init snapshot and reader
M
Minghao Li 已提交
87
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
88 89 90
  pSender->pReader = pReader;
  pSender->snapshot = snapshot;

91
  // init current block
M
Minghao Li 已提交
92 93 94 95 96
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }
  pSender->blockLen = 0;

97 98 99 100 101 102 103 104 105 106 107
  // update term
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  ++(pSender->privateTerm);

  // update state
  pSender->finish = false;
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;

  // init last config
108
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
109 110
    int32_t         code = 0;
    SSyncRaftEntry *pEntry = NULL;
111 112
    bool            getLastConfig = false;

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
    code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                          pSender->snapshot.lastConfigIndex, &pEntry);
    if (code == 0) {
      ASSERT(pEntry != NULL);

      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pEntry, &rpcMsg);

      SSyncCfg lastConfig;
      int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
      ASSERT(ret == 0);
      pSender->lastConfig = lastConfig;
      getLastConfig = true;

      rpcFreeCont(rpcMsg.pCont);
      syncEntryDestory(pEntry);
    } else {
      if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
131
        sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
132 133 134 135 136
        pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
        getLastConfig = true;
      }
    }

137
    // last config not found in wal, update to -1
138
    if (!getLastConfig) {
139 140 141
      SyncIndex oldLastConfigIndex = pSender->snapshot.lastConfigIndex;
      SyncIndex newLastConfigIndex = SYNC_INDEX_INVALID;
      pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
142
      memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
143 144 145 146 147 148 149 150 151 152

      // event log
      do {
        char logBuf[128];
        snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %ld to %ld", oldLastConfigIndex,
                 newLastConfigIndex);
        char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
        syncNodeEventLog(pSender->pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
153
    }
154 155

  } else {
156
    // no last config
157 158
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
159

M
Minghao Li 已提交
160
  // build begin msg
M
Minghao Li 已提交
161 162 163 164 165 166
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
167 168
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
169
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
170
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
171

M
Minghao Li 已提交
172
  // send msg
M
Minghao Li 已提交
173 174 175 176
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
  syncSnapshotSendDestroy(pMsg);
177 178 179

  // event log
  do {
180
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
181 182 183
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
184 185

  return 0;
M
Minghao Li 已提交
186 187
}

188
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
189
  // close reader
M
Minghao Li 已提交
190 191 192 193 194
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
195

196
  // free current block
M
Minghao Li 已提交
197 198
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
199
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
200 201
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
202

203
  // update flag
M
Minghao Li 已提交
204
  pSender->start = false;
205 206 207 208 209 210 211 212
  pSender->finish = finish;

  // event log
  do {
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
213 214

  return 0;
M
Minghao Li 已提交
215 216
}

217
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
218
// seq = ack + 1, already updated
M
Minghao Li 已提交
219
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
220
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
221 222
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
223
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
224 225 226 227 228 229 230
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
231 232 233
  if (pSender->blockLen > 0) {
    // has read data
  } else {
234
    // read finish, update seq to end
M
Minghao Li 已提交
235 236
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
237

M
Minghao Li 已提交
238
  // build msg
M
Minghao Li 已提交
239 240 241 242 243 244
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
245 246
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
247
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
248
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
249 250
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
251
  // send msg
M
Minghao Li 已提交
252 253 254
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
255
  syncSnapshotSendDestroy(pMsg);
M
Minghao Li 已提交
256

257 258 259 260 261 262 263 264
  // event log
  do {
    char *eventLog = NULL;
    if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender finish");
    } else {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender sending");
    }
M
Minghao Li 已提交
265 266
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
267
  } while (0);
M
Minghao Li 已提交
268

M
Minghao Li 已提交
269 270 271
  return 0;
}

M
Minghao Li 已提交
272 273
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
274 275 276
  // send current block data
  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
    // build msg
M
Minghao Li 已提交
277 278 279 280 281 282
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
283 284
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
285 286 287
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

288
    // send msg
M
Minghao Li 已提交
289 290 291 292
    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
    syncSnapshotSendDestroy(pMsg);
293 294 295 296 297 298 299

    // event log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender resend");
      syncNodeEventLog(pSender->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
M
Minghao Li 已提交
300
  }
301

M
Minghao Li 已提交
302 303 304
  return 0;
}

M
Minghao Li 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
333
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
334
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
335
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
336 337 338 339 340 341 342 343
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
344 345
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
346
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
347 348 349 350 351 352 353 354 355
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
356
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
357 358 359 360
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
361 362
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
  int32_t len = 256;
363
  char   *s = taosMemoryMalloc(len);
M
Minghao Li 已提交
364 365

  SRaftId  destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
366
  char     host[64];
M
Minghao Li 已提交
367 368 369
  uint16_t port;
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);

M
Minghao Li 已提交
370 371 372
  snprintf(s, len,
           "%s {%p laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu replica-index:%d %s:%d}", event,
           pSender, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
M
Minghao Li 已提交
373 374 375 376 377 378
           pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->privateTerm,
           pSender->replicaIndex, host, port);

  return s;
}

M
Minghao Li 已提交
379
// -------------------------------------
380
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
381 382
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
383

384
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
385 386 387 388
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
389

M
Minghao Li 已提交
390 391 392 393
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
394
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
395
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
396
    pReceiver->privateTerm = 0;
M
Minghao Li 已提交
397
    pReceiver->snapshot.data = NULL;
398
    pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
399
    pReceiver->snapshot.lastApplyTerm = 0;
400
    pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
401

M
Minghao Li 已提交
402
  } else {
403
    sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
M
Minghao Li 已提交
404
  }
405 406 407

  return pReceiver;
}
M
Minghao Li 已提交
408

409 410
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
411 412 413 414 415 416 417 418 419
    // close writer
    if (pReceiver->pWriter != NULL) {
      int32_t ret =
          pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
      ASSERT(ret == 0);
      pReceiver->pWriter = NULL;
    }

    // free receiver
420 421 422
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
423

424 425
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

426
// static do start by privateTerm, pBeginMsg
427
// receive first snapshot data
428
// write first block data
M
Minghao Li 已提交
429 430
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
                                    SyncSnapshotSend *pBeginMsg) {
431
  // update state
M
Minghao Li 已提交
432
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
433
  pReceiver->privateTerm = privateTerm;
M
Minghao Li 已提交
434
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
M
Minghao Li 已提交
435
  pReceiver->fromId = pBeginMsg->srcId;
436
  pReceiver->start = true;
M
Minghao Li 已提交
437

438
  // update snapshot
M
Minghao Li 已提交
439 440 441
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
M
Minghao Li 已提交
442

443
  // write data
M
Minghao Li 已提交
444 445 446
  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
447 448 449 450 451 452 453

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
M
Minghao Li 已提交
454 455
}

456
// force stop
457 458 459 460 461 462 463 464 465 466
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
  // force close, abandon incomplete data
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
  }

  pReceiver->start = false;
467 468 469 470 471 472 473

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
474 475
}

M
Minghao Li 已提交
476 477
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
478
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
479
  if (!snapshotReceiverIsStart(pReceiver)) {
480
    // first start
M
Minghao Li 已提交
481
    snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
M
Minghao Li 已提交
482

483
  } else {
M
Minghao Li 已提交
484
    // already start
485
    sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
M
Minghao Li 已提交
486 487

    // force close, abandon incomplete data
488
    snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
489 490

    // start again
M
Minghao Li 已提交
491
    snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
492
  }
493 494

  return 0;
495
}
M
Minghao Li 已提交
496

497
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
M
Minghao Li 已提交
498 499 500 501 502
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
503
  }
M
Minghao Li 已提交
504 505

  pReceiver->start = false;
506

507 508 509 510 511 512 513 514
  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
515 516

  return 0;
517 518
}

519
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
520 521
  ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);

522
  int32_t code = 0;
523
  if (pReceiver->pWriter != NULL) {
524
    // write data
525 526 527
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
528 529 530 531 532 533 534 535 536 537 538 539
      if (code != 0) {
        syncNodeErrorLog(pReceiver->pSyncNode, "snapshot write error");
        return -1;
      }
    }

    // reset wal
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "wal restore from snapshot error");
      return -1;
540 541
    }

542 543 544 545 546 547
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

    // stop writer
548
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
549 550 551 552 553
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
      ASSERT(0);
      return -1;
    }
554 555
    pReceiver->pWriter = NULL;

556 557
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
558

559 560 561
  } else {
    syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
    return -1;
562 563 564 565 566 567 568 569 570 571
  }

  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
572 573

  return 0;
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
}

static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  ASSERT(pMsg->seq == pReceiver->ack + 1);

  if (pReceiver->pWriter != NULL) {
    if (pMsg->dataLen > 0) {
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                   pMsg->data, pMsg->dataLen);
      ASSERT(code == 0);
    }
    pReceiver->ack = pMsg->seq;

    // event log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
      syncNodeEventLog(pReceiver->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
593
  }
594 595 596 597 598
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
599

600 601 602
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
603

604 605 606 607 608
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
609 610 611 612 613 614

    cJSON *pFromId = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
615
      cJSON   *pTmp = pFromId;
616 617 618 619 620 621 622 623 624
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

M
Minghao Li 已提交
625 626 627 628 629 630 631 632 633
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyIndex);
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyTerm);
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastConfigIndex);
    cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);

634 635
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
636 637 638

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
639 640 641 642 643 644 645 646 647
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
648
  char  *serialized = cJSON_Print(pJson);
649 650 651
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
652

M
Minghao Li 已提交
653 654
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
  int32_t len = 256;
655
  char   *s = taosMemoryMalloc(len);
M
Minghao Li 已提交
656 657 658 659 660 661

  SRaftId  fromId = pReceiver->fromId;
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);

M
Minghao Li 已提交
662
  snprintf(s, len, "%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld}", event,
M
Minghao Li 已提交
663 664
           pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
           pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
M
Minghao Li 已提交
665 666 667 668

  return s;
}

669
// receiver do something
M
Minghao Li 已提交
670
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
671
  // get receiver
672 673
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
674
  int32_t                code = 0;
M
Minghao Li 已提交
675

676 677
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
678 679
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
680
        // begin, no data
M
Minghao Li 已提交
681
        snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
M
Minghao Li 已提交
682 683 684 685
        needRsp = true;

      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
686 687 688 689
        code = snapshotReceiverFinish(pReceiver, pMsg);
        if (code == 0) {
          snapshotReceiverStop(pReceiver);
        }
690
        needRsp = true;
691

692 693
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
694 695
          // int32_t  oldReplicaNum = pSyncNode->replicaNum;
          SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
696

697 698 699
          // update new config myIndex
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
M
Minghao Li 已提交
700 701 702

          // do config change
          syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
703 704
        }

M
Minghao Li 已提交
705
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
706 707
        // force close
        snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
708 709 710 711 712
        needRsp = false;

      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
713
          snapshotReceiverGotData(pReceiver, pMsg);
M
Minghao Li 已提交
714 715 716 717
        }
        needRsp = true;

      } else {
M
Minghao Li 已提交
718 719 720 721 722 723 724 725 726 727
        // error log
        do {
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
728
      }
M
Minghao Li 已提交
729

730
      // send ack
M
Minghao Li 已提交
731
      if (needRsp) {
732
        // build msg
M
Minghao Li 已提交
733 734 735 736 737 738
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
739 740 741
        pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
        pRspMsg->code = 0;
        pRspMsg->privateTerm = pReceiver->privateTerm;  // receiver maybe already closed
M
Minghao Li 已提交
742

743
        // send msg
M
Minghao Li 已提交
744 745 746 747 748
        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
        syncSnapshotRspDestroy(pRspMsg);
      }
749 750 751 752 753 754 755
    } else {
      // error log
      do {
        char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
M
Minghao Li 已提交
756
    }
M
Minghao Li 已提交
757
  } else {
758 759 760 761 762 763
    // error log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
M
Minghao Li 已提交
764
  }
M
Minghao Li 已提交
765

M
Minghao Li 已提交
766 767 768
  return 0;
}

769 770 771 772 773 774
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  ASSERT(pMsg->ack == pSender->seq);
  pSender->ack = pMsg->ack;
  ++(pSender->seq);
}

M
Minghao Li 已提交
775 776
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
777
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
778 779
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
780 781
    sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
    return -1;
782 783
  }

M
Minghao Li 已提交
784
  // get sender
785
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
786 787 788 789 790
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
791 792
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
793
        snapshotSenderStop(pSender, true);
M
Minghao Li 已提交
794 795 796 797
        return 0;
      }

      // send next msg
798
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
799
        // update sender ack
800
        snapshotSenderUpdateProgress(pSender, pMsg);
M
Minghao Li 已提交
801
        snapshotSend(pSender);
802

M
Minghao Li 已提交
803 804
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
805

M
Minghao Li 已提交
806
      } else {
807
        // error log
808
        do {
M
Minghao Li 已提交
809 810
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
811 812 813 814 815 816
          char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
817
      }
818 819 820 821 822 823 824 825 826
    } else {
      // error log
      do {
        char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);

      return -1;
827
    }
M
Minghao Li 已提交
828
  } else {
829 830 831 832 833 834 835 836
    // error log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);

    return -1;
837 838 839
  }

  return 0;
840
}