syncSnapshot.c 30.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
//----------------------------------
25 26 27 28 29
static void    snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
static void    snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
static void    snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
static void    snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
M
Minghao Li 已提交
30

31
//----------------------------------
M
Minghao Li 已提交
32
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
33 34
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
35

M
Minghao Li 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
52
    pSender->privateTerm = taosGetTimestampMs() + 100;
53
    pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
54
    pSender->finish = false;
M
Minghao Li 已提交
55
  } else {
56
    sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
M
Minghao Li 已提交
57
  }
58

M
Minghao Li 已提交
59 60
  return pSender;
}
M
Minghao Li 已提交
61

M
Minghao Li 已提交
62 63
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
64
    // free current block
M
Minghao Li 已提交
65 66
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
67
      pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
68
    }
69 70 71 72

    // close reader
    if (pSender->pReader != NULL) {
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
73 74 75
      if (ret != 0) {
        syncNodeErrorLog(pSender->pSyncNode, "stop reader error");
      }
76 77 78 79
      pSender->pReader = NULL;
    }

    // free sender
M
Minghao Li 已提交
80 81 82
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
83

M
Minghao Li 已提交
84 85
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

86 87 88 89 90 91
// begin send snapshot by param, snapshot, pReader
//
// action:
// 1. assert reader not start
// 2. update state
// 3. send first snapshot block
92 93
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
                            void *pReader) {
M
Minghao Li 已提交
94
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
95

96
  // init snapshot, parm, reader
M
Minghao Li 已提交
97
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
98 99
  pSender->pReader = pReader;
  pSender->snapshot = snapshot;
100
  pSender->snapshotParam = snapshotParam;
M
Minghao Li 已提交
101

102
  // init current block
M
Minghao Li 已提交
103 104 105 106 107
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }
  pSender->blockLen = 0;

108 109
  // update term
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
110
  ++(pSender->privateTerm);  // increase private term
111 112 113 114 115 116 117 118

  // update state
  pSender->finish = false;
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;

  // init last config
119
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
120 121
    int32_t         code = 0;
    SSyncRaftEntry *pEntry = NULL;
122 123
    bool            getLastConfig = false;

124 125
    code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                          pSender->snapshot.lastConfigIndex, &pEntry);
126
    if (code == 0 && pEntry != NULL) {
127 128 129 130 131 132 133 134 135 136 137 138 139
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pEntry, &rpcMsg);

      SSyncCfg lastConfig;
      int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
      ASSERT(ret == 0);
      pSender->lastConfig = lastConfig;
      getLastConfig = true;

      rpcFreeCont(rpcMsg.pCont);
      syncEntryDestory(pEntry);
    } else {
      if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
140
        sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
141 142 143 144 145
        pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
        getLastConfig = true;
      }
    }

146
    // last config not found in wal, update to -1
147
    if (!getLastConfig) {
148 149 150
      SyncIndex oldLastConfigIndex = pSender->snapshot.lastConfigIndex;
      SyncIndex newLastConfigIndex = SYNC_INDEX_INVALID;
      pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
151
      memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
152 153 154 155 156 157 158 159 160 161

      // event log
      do {
        char logBuf[128];
        snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %ld to %ld", oldLastConfigIndex,
                 newLastConfigIndex);
        char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
        syncNodeEventLog(pSender->pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
162
    }
163 164

  } else {
165
    // no last config
166 167
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
168

M
Minghao Li 已提交
169
  // build begin msg
M
Minghao Li 已提交
170 171 172 173
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
174
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
175 176
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
177 178
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
179
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
180
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
181

M
Minghao Li 已提交
182
  // send msg
M
Minghao Li 已提交
183 184 185 186
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
  syncSnapshotSendDestroy(pMsg);
187 188 189

  // event log
  do {
190
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
191 192 193
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
194 195

  return 0;
M
Minghao Li 已提交
196 197
}

198
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
199
  // close reader
M
Minghao Li 已提交
200 201 202 203 204
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
205

206
  // free current block
M
Minghao Li 已提交
207 208
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
209
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
210 211
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
212

213
  // update flag
M
Minghao Li 已提交
214
  pSender->start = false;
215 216
  pSender->finish = finish;

217 218
  // do not update term, maybe print

219 220 221 222 223 224
  // event log
  do {
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
225 226

  return 0;
M
Minghao Li 已提交
227 228
}

229
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
230
// seq = ack + 1, already updated
M
Minghao Li 已提交
231
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
232
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
233 234
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
235
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
236 237 238 239 240 241 242
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
243 244 245
  if (pSender->blockLen > 0) {
    // has read data
  } else {
246
    // read finish, update seq to end
M
Minghao Li 已提交
247 248
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
249

M
Minghao Li 已提交
250
  // build msg
M
Minghao Li 已提交
251 252 253 254
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
255
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
256 257
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
258 259
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
260
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
261
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
262 263
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
264
  // send msg
M
Minghao Li 已提交
265 266 267
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
268
  syncSnapshotSendDestroy(pMsg);
M
Minghao Li 已提交
269

270 271 272 273 274 275 276 277
  // event log
  do {
    char *eventLog = NULL;
    if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender finish");
    } else {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender sending");
    }
M
Minghao Li 已提交
278 279
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
280
  } while (0);
M
Minghao Li 已提交
281

M
Minghao Li 已提交
282 283 284
  return 0;
}

M
Minghao Li 已提交
285 286
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
287 288 289
  // send current block data
  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
    // build msg
M
Minghao Li 已提交
290 291 292 293
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
294
    pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
295 296
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
297 298
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
299
    pMsg->seq = pSender->seq;
300
    pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
301 302
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

303
    // send msg
M
Minghao Li 已提交
304 305 306 307
    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
    syncSnapshotSendDestroy(pMsg);
308 309 310 311 312 313 314

    // event log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender resend");
      syncNodeEventLog(pSender->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
M
Minghao Li 已提交
315
  }
316

M
Minghao Li 已提交
317 318 319
  return 0;
}

320 321 322 323 324 325
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  ASSERT(pMsg->ack == pSender->seq);
  pSender->ack = pMsg->ack;
  ++(pSender->seq);
}

M
Minghao Li 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
354
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
355
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
356
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
357 358 359 360 361 362 363 364
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
365 366
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
367
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
368 369 370 371 372 373 374 375 376
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
377
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
378 379 380 381
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
382 383
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
  int32_t len = 256;
384
  char *  s = taosMemoryMalloc(len);
M
Minghao Li 已提交
385 386

  SRaftId  destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
387
  char     host[64];
M
Minghao Li 已提交
388 389 390
  uint16_t port;
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);

M
Minghao Li 已提交
391
  snprintf(s, len,
392 393 394 395 396
           "%s {%p s-param:%ld e-param:%ld laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu "
           "replica-index:%d %s:%d}",
           event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
           pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
           pSender->finish, pSender->privateTerm, pSender->replicaIndex, host, port);
M
Minghao Li 已提交
397 398 399 400

  return s;
}

M
Minghao Li 已提交
401
// -------------------------------------
402
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
403 404
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
405

406
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
407 408 409 410
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
411

M
Minghao Li 已提交
412 413 414 415
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
416
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
417
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
418
    pReceiver->privateTerm = 0;
M
Minghao Li 已提交
419
    pReceiver->snapshot.data = NULL;
420
    pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
421
    pReceiver->snapshot.lastApplyTerm = 0;
422
    pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
423

M
Minghao Li 已提交
424
  } else {
425
    sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
M
Minghao Li 已提交
426
  }
427 428 429

  return pReceiver;
}
M
Minghao Li 已提交
430

431 432
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
433 434 435 436 437 438 439 440 441
    // close writer
    if (pReceiver->pWriter != NULL) {
      int32_t ret =
          pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
      ASSERT(ret == 0);
      pReceiver->pWriter = NULL;
    }

    // free receiver
442 443 444
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
445

446 447
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

448
// static do start by privateTerm, pBeginMsg
449
// receive first snapshot data
450
// write first block data
451
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
452
  // update state
M
Minghao Li 已提交
453
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
454
  pReceiver->privateTerm = pBeginMsg->privateTerm;
M
Minghao Li 已提交
455
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
M
Minghao Li 已提交
456
  pReceiver->fromId = pBeginMsg->srcId;
457
  pReceiver->start = true;
M
Minghao Li 已提交
458

459
  // update snapshot
M
Minghao Li 已提交
460 461 462
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
463 464
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
M
Minghao Li 已提交
465

466
  // start writer
M
Minghao Li 已提交
467
  ASSERT(pReceiver->pWriter == NULL);
468 469
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
                                                                 &(pReceiver->snapshotParam), &(pReceiver->pWriter));
M
Minghao Li 已提交
470
  ASSERT(ret == 0);
471 472 473 474 475 476 477

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
M
Minghao Li 已提交
478 479
}

480
// force stop
481 482 483 484 485 486 487 488 489 490
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
  // force close, abandon incomplete data
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
  }

  pReceiver->start = false;
491 492 493 494 495 496 497

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
498 499
}

M
Minghao Li 已提交
500 501
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
502
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
503
  if (!snapshotReceiverIsStart(pReceiver)) {
504
    // first start
505
    snapshotReceiverDoStart(pReceiver, pBeginMsg);
M
Minghao Li 已提交
506

507
  } else {
M
Minghao Li 已提交
508
    // already start
509
    sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
M
Minghao Li 已提交
510 511

    // force close, abandon incomplete data
512
    snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
513 514

    // start again
515
    snapshotReceiverDoStart(pReceiver, pBeginMsg);
516
  }
517 518

  return 0;
519
}
M
Minghao Li 已提交
520

521 522
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
523
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
M
Minghao Li 已提交
524 525 526 527 528
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
529
  }
M
Minghao Li 已提交
530 531

  pReceiver->start = false;
532

533 534 535 536 537 538 539 540
  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
541 542

  return 0;
543 544
}

545
// when recv last snapshot block, apply data into snapshot
546
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
547 548
  ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);

549
  int32_t code = 0;
550
  if (pReceiver->pWriter != NULL) {
551
    // write data
552 553 554
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
555 556 557 558 559 560 561 562 563 564 565 566
      if (code != 0) {
        syncNodeErrorLog(pReceiver->pSyncNode, "snapshot write error");
        return -1;
      }
    }

    // reset wal
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "wal restore from snapshot error");
      return -1;
567 568
    }

569 570 571 572 573
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

574
    // stop writer, apply data
575
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
576 577 578 579 580
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
      ASSERT(0);
      return -1;
    }
581 582
    pReceiver->pWriter = NULL;

583 584
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
585

586 587 588
  } else {
    syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
    return -1;
589 590 591 592 593 594 595 596 597 598
  }

  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
599 600

  return 0;
601 602
}

603 604
// apply data block
// update progress
605 606 607 608 609
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  ASSERT(pMsg->seq == pReceiver->ack + 1);

  if (pReceiver->pWriter != NULL) {
    if (pMsg->dataLen > 0) {
610
      // apply data block
611 612 613 614
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                   pMsg->data, pMsg->dataLen);
      ASSERT(code == 0);
    }
615 616

    // update progress
617 618 619 620 621 622 623 624
    pReceiver->ack = pMsg->seq;

    // event log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
      syncNodeEventLog(pReceiver->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
625
  }
626 627 628 629 630
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
631

632 633 634
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
635

636 637 638 639 640
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
641 642 643 644 645 646

    cJSON *pFromId = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
647
      cJSON *  pTmp = pFromId;
648 649 650 651 652 653 654 655 656
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

M
Minghao Li 已提交
657 658 659 660 661 662 663 664 665
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyIndex);
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyTerm);
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastConfigIndex);
    cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);

666 667
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
668 669 670

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
671 672 673 674 675 676 677 678 679
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
680
  char * serialized = cJSON_Print(pJson);
681 682 683
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
684

M
Minghao Li 已提交
685 686
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
  int32_t len = 256;
687
  char *  s = taosMemoryMalloc(len);
M
Minghao Li 已提交
688 689 690 691 692 693

  SRaftId  fromId = pReceiver->fromId;
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);

694 695 696 697 698 699
  snprintf(s, len,
           "%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d s-param:%ld e-param:%ld laindex:%ld laterm:%lu "
           "lcindex:%ld}",
           event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
           pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex,
           pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
M
Minghao Li 已提交
700 701 702 703

  return s;
}

704 705 706 707 708 709
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm
// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
// condition 4, got data, update ack
M
Minghao Li 已提交
710
//
M
Minghao Li 已提交
711
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
712
  // get receiver
713 714
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
715
  int32_t                code = 0;
M
Minghao Li 已提交
716

717 718
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
719 720
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
721
        // condition 1
722
        // begin, no data
723
        snapshotReceiverStart(pReceiver, pMsg);
M
Minghao Li 已提交
724 725 726
        needRsp = true;

      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
727
        // condition 2
M
Minghao Li 已提交
728
        // end, finish FSM
729 730 731 732
        code = snapshotReceiverFinish(pReceiver, pMsg);
        if (code == 0) {
          snapshotReceiverStop(pReceiver);
        }
733
        needRsp = true;
734

735 736
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
737
          SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
738

739 740 741
          // update new config myIndex
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
M
Minghao Li 已提交
742 743 744

          // do config change
          syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
745 746
        }

M
Minghao Li 已提交
747
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
748
        // condition 3
749 750
        // force close
        snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
751 752 753
        needRsp = false;

      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
754
        // condition 4
M
Minghao Li 已提交
755 756
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
757
          snapshotReceiverGotData(pReceiver, pMsg);
M
Minghao Li 已提交
758 759 760 761
        }
        needRsp = true;

      } else {
M
Minghao Li 已提交
762 763 764 765 766 767 768 769 770 771
        // error log
        do {
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
772
      }
M
Minghao Li 已提交
773

774
      // send ack
M
Minghao Li 已提交
775
      if (needRsp) {
776
        // build msg
M
Minghao Li 已提交
777 778 779 780 781 782
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
783 784 785
        pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
        pRspMsg->code = 0;
        pRspMsg->privateTerm = pReceiver->privateTerm;  // receiver maybe already closed
M
Minghao Li 已提交
786

787
        // send msg
M
Minghao Li 已提交
788 789 790 791 792
        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
        syncSnapshotRspDestroy(pRspMsg);
      }
793

794 795 796 797 798 799 800
    } else {
      // error log
      do {
        char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
801 802

      return -1;
M
Minghao Li 已提交
803
    }
M
Minghao Li 已提交
804
  } else {
805 806 807 808 809 810
    // error log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
811 812

    return -1;
M
Minghao Li 已提交
813
  }
M
Minghao Li 已提交
814

M
Minghao Li 已提交
815 816 817
  return 0;
}

818 819 820 821 822 823
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
824
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
825 826
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
827 828
    sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
    return -1;
829 830
  }

M
Minghao Li 已提交
831
  // get sender
832
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
833 834 835 836 837
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
838 839
      // condition 1
      // receive ack is finish, close sender
M
Minghao Li 已提交
840
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
841
        snapshotSenderStop(pSender, true);
M
Minghao Li 已提交
842 843 844
        return 0;
      }

845
      // condition 2
M
Minghao Li 已提交
846
      // send next msg
847
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
848
        // update sender ack
849
        snapshotSenderUpdateProgress(pSender, pMsg);
M
Minghao Li 已提交
850
        snapshotSend(pSender);
851

M
Minghao Li 已提交
852
      } else if (pMsg->ack == pSender->seq - 1) {
853
        // maybe resend
M
Minghao Li 已提交
854
        snapshotReSend(pSender);
855

M
Minghao Li 已提交
856
      } else {
857
        // error log
858
        do {
M
Minghao Li 已提交
859 860
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
861 862 863 864 865 866
          char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
867
      }
868 869 870 871 872 873 874 875 876
    } else {
      // error log
      do {
        char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);

      return -1;
877
    }
M
Minghao Li 已提交
878
  } else {
879 880 881 882 883 884 885 886
    // error log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);

    return -1;
887 888 889
  }

  return 0;
890
}