syncSnapshot.c 31.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
//----------------------------------
25 26 27 28
static void    snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
static void    snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
static void    snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
M
Minghao Li 已提交
29

30
//----------------------------------
M
Minghao Li 已提交
31
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
32 33
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
34

M
Minghao Li 已提交
35 36 37
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
38 39 40 41
    if (pSender == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return NULL;
    }
M
Minghao Li 已提交
42 43 44 45 46 47 48 49 50 51 52 53
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
54
    pSender->privateTerm = taosGetTimestampMs() + 100;
55
    pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
56
    pSender->finish = false;
M
Minghao Li 已提交
57
  } else {
58
    sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
M
Minghao Li 已提交
59
  }
60

M
Minghao Li 已提交
61 62
  return pSender;
}
M
Minghao Li 已提交
63

M
Minghao Li 已提交
64 65
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
66
    // free current block
M
Minghao Li 已提交
67 68
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
69
      pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
70
    }
71 72 73 74

    // close reader
    if (pSender->pReader != NULL) {
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
75 76 77
      if (ret != 0) {
        syncNodeErrorLog(pSender->pSyncNode, "stop reader error");
      }
78 79 80 81
      pSender->pReader = NULL;
    }

    // free sender
M
Minghao Li 已提交
82 83 84
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
85

M
Minghao Li 已提交
86 87
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

88 89 90 91 92 93
// begin send snapshot by param, snapshot, pReader
//
// action:
// 1. assert reader not start
// 2. update state
// 3. send first snapshot block
94 95
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
                            void *pReader) {
M
Minghao Li 已提交
96
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
97

98
  // init snapshot, parm, reader
M
Minghao Li 已提交
99
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
100 101
  pSender->pReader = pReader;
  pSender->snapshot = snapshot;
102
  pSender->snapshotParam = snapshotParam;
M
Minghao Li 已提交
103

104
  // init current block
M
Minghao Li 已提交
105 106 107 108 109
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }
  pSender->blockLen = 0;

110 111
  // update term
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
112
  ++(pSender->privateTerm);  // increase private term
113 114 115 116 117 118 119 120

  // update state
  pSender->finish = false;
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;

  // init last config
121
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
122 123
    int32_t         code = 0;
    SSyncRaftEntry *pEntry = NULL;
124 125
    bool            getLastConfig = false;

126 127
    code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                          pSender->snapshot.lastConfigIndex, &pEntry);
128
    if (code == 0 && pEntry != NULL) {
129 130 131 132 133 134 135 136 137 138 139 140 141
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pEntry, &rpcMsg);

      SSyncCfg lastConfig;
      int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
      ASSERT(ret == 0);
      pSender->lastConfig = lastConfig;
      getLastConfig = true;

      rpcFreeCont(rpcMsg.pCont);
      syncEntryDestory(pEntry);
    } else {
      if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
142
        sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
143 144 145 146 147
        pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
        getLastConfig = true;
      }
    }

148
    // last config not found in wal, update to -1
149
    if (!getLastConfig) {
150 151 152
      SyncIndex oldLastConfigIndex = pSender->snapshot.lastConfigIndex;
      SyncIndex newLastConfigIndex = SYNC_INDEX_INVALID;
      pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
153
      memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
154 155 156 157

      // event log
      do {
        char logBuf[128];
158 159
        snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %" PRId64 " to %" PRId64,
                 oldLastConfigIndex, newLastConfigIndex);
160 161 162 163
        char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
        syncNodeEventLog(pSender->pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
164
    }
165 166

  } else {
167
    // no last config
168 169
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
170

M
Minghao Li 已提交
171
  // build begin msg
M
Minghao Li 已提交
172 173 174 175
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
176
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
177 178
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
179 180
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
181
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
182
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
183

M
Minghao Li 已提交
184
  // send msg
M
Minghao Li 已提交
185 186 187 188
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
  syncSnapshotSendDestroy(pMsg);
189 190 191

  // event log
  do {
192
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
193 194 195
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
196 197

  return 0;
M
Minghao Li 已提交
198 199
}

200
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
201
  // close reader
M
Minghao Li 已提交
202 203 204 205 206
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
207

208
  // free current block
M
Minghao Li 已提交
209 210
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
211
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
212 213
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
214

215
  // update flag
M
Minghao Li 已提交
216
  pSender->start = false;
217 218
  pSender->finish = finish;

219 220
  // do not update term, maybe print

221 222 223 224 225 226
  // event log
  do {
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
227 228

  return 0;
M
Minghao Li 已提交
229 230
}

231
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
232
// seq = ack + 1, already updated
M
Minghao Li 已提交
233
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
234
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
235 236
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
237
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
238 239 240 241 242 243 244
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
245 246 247
  if (pSender->blockLen > 0) {
    // has read data
  } else {
248
    // read finish, update seq to end
M
Minghao Li 已提交
249 250
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
251

M
Minghao Li 已提交
252
  // build msg
M
Minghao Li 已提交
253 254 255 256
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
257
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
258 259
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
260 261
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
262
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
263
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
264 265
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
266
  // send msg
M
Minghao Li 已提交
267 268 269
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
270
  syncSnapshotSendDestroy(pMsg);
M
Minghao Li 已提交
271

272 273 274 275 276 277 278 279
  // event log
  do {
    char *eventLog = NULL;
    if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender finish");
    } else {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender sending");
    }
M
Minghao Li 已提交
280 281
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
282
  } while (0);
M
Minghao Li 已提交
283

M
Minghao Li 已提交
284 285 286
  return 0;
}

M
Minghao Li 已提交
287 288
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
289 290 291
  // send current block data
  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
    // build msg
M
Minghao Li 已提交
292 293 294 295
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
296
    pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
297 298
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
299 300
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
301
    pMsg->seq = pSender->seq;
302
    pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
303 304
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

305
    // send msg
M
Minghao Li 已提交
306 307 308 309
    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
    syncSnapshotSendDestroy(pMsg);
310 311 312 313 314 315 316

    // event log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender resend");
      syncNodeEventLog(pSender->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
M
Minghao Li 已提交
317
  }
318

M
Minghao Li 已提交
319 320 321
  return 0;
}

322 323 324 325 326 327
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  ASSERT(pMsg->ack == pSender->seq);
  pSender->ack = pMsg->ack;
  ++(pSender->seq);
}

M
Minghao Li 已提交
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
S
Shengliang Guan 已提交
355
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
356
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
S
Shengliang Guan 已提交
357
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
358
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
359
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
S
Shengliang Guan 已提交
360
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS);
M
Minghao Li 已提交
361 362 363 364
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
S
Shengliang Guan 已提交
365
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term);
M
Minghao Li 已提交
366
    cJSON_AddStringToObject(pRoot, "term", u64buf);
S
Shengliang Guan 已提交
367
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm);
368
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
369
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
370 371 372 373 374 375 376 377 378
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
379
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
380 381 382 383
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
384 385
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
  int32_t len = 256;
386
  char   *s = taosMemoryMalloc(len);
M
Minghao Li 已提交
387 388

  SRaftId  destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
389
  char     host[64];
M
Minghao Li 已提交
390 391 392
  uint16_t port;
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);

M
Minghao Li 已提交
393
  snprintf(s, len,
394 395 396
           "%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
           " seq:%d ack:%d finish:%d pterm:%" PRIu64
           " "
397 398 399 400
           "replica-index:%d %s:%d}",
           event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
           pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
           pSender->finish, pSender->privateTerm, pSender->replicaIndex, host, port);
M
Minghao Li 已提交
401 402 403 404

  return s;
}

M
Minghao Li 已提交
405
// -------------------------------------
406
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
407 408
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
409

410
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
411 412 413 414
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
415

M
Minghao Li 已提交
416 417 418 419
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
420
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
421
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
422
    pReceiver->privateTerm = 0;
M
Minghao Li 已提交
423
    pReceiver->snapshot.data = NULL;
424
    pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
425
    pReceiver->snapshot.lastApplyTerm = 0;
426
    pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
427

M
Minghao Li 已提交
428
  } else {
429
    sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
M
Minghao Li 已提交
430
  }
431 432 433

  return pReceiver;
}
M
Minghao Li 已提交
434

435 436
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
437 438
    // close writer
    if (pReceiver->pWriter != NULL) {
439 440
      int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                    false, &(pReceiver->snapshot));
441 442 443 444 445
      ASSERT(ret == 0);
      pReceiver->pWriter = NULL;
    }

    // free receiver
446 447 448
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
449

450 451
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

452
// static do start by privateTerm, pBeginMsg
453
// receive first snapshot data
454
// write first block data
455
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
456
  // update state
M
Minghao Li 已提交
457
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
458
  pReceiver->privateTerm = pBeginMsg->privateTerm;
M
Minghao Li 已提交
459
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
M
Minghao Li 已提交
460
  pReceiver->fromId = pBeginMsg->srcId;
461
  pReceiver->start = true;
M
Minghao Li 已提交
462

463
  // update snapshot
M
Minghao Li 已提交
464 465 466
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
467 468
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
M
Minghao Li 已提交
469

470
  // start writer
M
Minghao Li 已提交
471
  ASSERT(pReceiver->pWriter == NULL);
472 473
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
                                                                 &(pReceiver->snapshotParam), &(pReceiver->pWriter));
M
Minghao Li 已提交
474
  ASSERT(ret == 0);
475 476 477 478 479 480 481

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
M
Minghao Li 已提交
482 483
}

484
// force stop
485
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
486 487
  // force close, abandon incomplete data
  if (pReceiver->pWriter != NULL) {
488 489
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &(pReceiver->snapshot));
490 491 492 493 494
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
  }

  pReceiver->start = false;
495 496 497 498 499 500 501

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
502 503
}

M
Minghao Li 已提交
504 505
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
506
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
507
  if (!snapshotReceiverIsStart(pReceiver)) {
508
    // first start
509
    snapshotReceiverDoStart(pReceiver, pBeginMsg);
M
Minghao Li 已提交
510

511
  } else {
M
Minghao Li 已提交
512
    // already start
513
    sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
M
Minghao Li 已提交
514 515

    // force close, abandon incomplete data
516
    snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
517 518

    // start again
519
    snapshotReceiverDoStart(pReceiver, pBeginMsg);
520
  }
521 522

  return 0;
523
}
M
Minghao Li 已提交
524

525 526
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
527
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
M
Minghao Li 已提交
528
  if (pReceiver->pWriter != NULL) {
529 530
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &(pReceiver->snapshot));
M
Minghao Li 已提交
531 532
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
533
  }
M
Minghao Li 已提交
534 535

  pReceiver->start = false;
536

537 538 539 540 541 542 543 544
  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
545 546

  return 0;
547 548
}

549
// when recv last snapshot block, apply data into snapshot
550
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
551 552
  ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);

553
  int32_t code = 0;
554
  if (pReceiver->pWriter != NULL) {
555
    // write data
556 557 558
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
559 560 561 562 563 564 565 566 567 568 569 570
      if (code != 0) {
        syncNodeErrorLog(pReceiver->pSyncNode, "snapshot write error");
        return -1;
      }
    }

    // reset wal
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "wal restore from snapshot error");
      return -1;
571 572
    }

573 574 575 576 577
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

M
Minghao Li 已提交
578 579 580 581 582 583
    // maybe update term
    if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
      pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
      raftStorePersist(pReceiver->pSyncNode->pRaftStore);
    }

584
    // stop writer, apply data
585 586
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
                                                           &(pReceiver->snapshot));
587 588
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
589
      // ASSERT(0);
590 591
      return -1;
    }
592 593
    pReceiver->pWriter = NULL;

594 595
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
596

597 598 599
  } else {
    syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
    return -1;
600 601 602 603 604 605 606 607 608 609
  }

  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
610 611

  return 0;
612 613
}

614 615
// apply data block
// update progress
616 617 618 619 620
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  ASSERT(pMsg->seq == pReceiver->ack + 1);

  if (pReceiver->pWriter != NULL) {
    if (pMsg->dataLen > 0) {
621
      // apply data block
622 623 624 625
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                   pMsg->data, pMsg->dataLen);
      ASSERT(code == 0);
    }
626 627

    // update progress
628 629 630 631 632 633 634 635
    pReceiver->ack = pMsg->seq;

    // event log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
      syncNodeEventLog(pReceiver->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
636
  }
637 638 639 640 641
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
642

643 644 645
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
646

647 648 649 650 651
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
652 653

    cJSON *pFromId = cJSON_CreateObject();
S
Shengliang Guan 已提交
654
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr);
655 656 657
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
658
      cJSON   *pTmp = pFromId;
659 660 661 662 663 664 665 666 667
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

S
Shengliang Guan 已提交
668
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex);
M
Minghao Li 已提交
669 670
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);

S
Shengliang Guan 已提交
671
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm);
M
Minghao Li 已提交
672 673
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);

S
Shengliang Guan 已提交
674
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex);
M
Minghao Li 已提交
675 676
    cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);

S
Shengliang Guan 已提交
677
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term);
678
    cJSON_AddStringToObject(pRoot, "term", u64buf);
679

S
Shengliang Guan 已提交
680
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->privateTerm);
681
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
682 683 684 685 686 687 688 689 690
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
691
  char  *serialized = cJSON_Print(pJson);
692 693 694
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
695

M
Minghao Li 已提交
696 697
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
  int32_t len = 256;
698
  char   *s = taosMemoryMalloc(len);
M
Minghao Li 已提交
699 700 701 702 703 704

  SRaftId  fromId = pReceiver->fromId;
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);

705
  snprintf(s, len,
706 707 708
           "%s {%p start:%d ack:%d term:%" PRIu64 " pterm:%" PRIu64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64
           " laindex:%" PRId64 " laterm:%" PRIu64
           " "
S
Shengliang Guan 已提交
709
           "lcindex:%" PRId64 "}",
710 711 712
           event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
           pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex,
           pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
M
Minghao Li 已提交
713 714 715 716

  return s;
}

717 718 719 720 721 722
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm
// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
// condition 4, got data, update ack
M
Minghao Li 已提交
723
//
M
Minghao Li 已提交
724
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
725
  // get receiver
726 727
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
728
  int32_t                code = 0;
M
Minghao Li 已提交
729

730 731
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
732 733
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
734
        // condition 1
735
        // begin, no data
736
        snapshotReceiverStart(pReceiver, pMsg);
M
Minghao Li 已提交
737 738 739
        needRsp = true;

      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
740
        // condition 2
M
Minghao Li 已提交
741
        // end, finish FSM
742 743 744 745
        code = snapshotReceiverFinish(pReceiver, pMsg);
        if (code == 0) {
          snapshotReceiverStop(pReceiver);
        }
746
        needRsp = true;
747

748 749
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
750
          SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
751

752 753 754
          // update new config myIndex
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
M
Minghao Li 已提交
755 756 757

          // do config change
          syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
758 759
        }

M
Minghao Li 已提交
760
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
761
        // condition 3
762 763
        // force close
        snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
764 765 766
        needRsp = false;

      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
767
        // condition 4
M
Minghao Li 已提交
768 769
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
770
          snapshotReceiverGotData(pReceiver, pMsg);
M
Minghao Li 已提交
771 772 773 774
        }
        needRsp = true;

      } else {
M
Minghao Li 已提交
775 776 777 778 779 780 781 782 783 784
        // error log
        do {
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
785
      }
M
Minghao Li 已提交
786

787
      // send ack
M
Minghao Li 已提交
788
      if (needRsp) {
789
        // build msg
M
Minghao Li 已提交
790 791 792 793 794 795
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
796 797 798
        pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
        pRspMsg->code = 0;
        pRspMsg->privateTerm = pReceiver->privateTerm;  // receiver maybe already closed
M
Minghao Li 已提交
799

800
        // send msg
M
Minghao Li 已提交
801 802 803 804 805
        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
        syncSnapshotRspDestroy(pRspMsg);
      }
806

807 808 809 810 811 812 813
    } else {
      // error log
      do {
        char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
814 815

      return -1;
M
Minghao Li 已提交
816
    }
M
Minghao Li 已提交
817
  } else {
818 819 820 821 822 823
    // error log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
824 825

    return -1;
M
Minghao Li 已提交
826
  }
M
Minghao Li 已提交
827

M
Minghao Li 已提交
828 829 830
  return 0;
}

831 832 833 834 835 836
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
837
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
838 839
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
840 841
    sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
    return -1;
842 843
  }

M
Minghao Li 已提交
844
  // get sender
845
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
846 847 848 849 850
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
851 852
      // condition 1
      // receive ack is finish, close sender
M
Minghao Li 已提交
853
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
854
        snapshotSenderStop(pSender, true);
M
Minghao Li 已提交
855 856 857
        return 0;
      }

858
      // condition 2
M
Minghao Li 已提交
859
      // send next msg
860
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
861
        // update sender ack
862
        snapshotSenderUpdateProgress(pSender, pMsg);
M
Minghao Li 已提交
863
        snapshotSend(pSender);
864

M
Minghao Li 已提交
865
      } else if (pMsg->ack == pSender->seq - 1) {
866
        // maybe resend
M
Minghao Li 已提交
867
        snapshotReSend(pSender);
868

M
Minghao Li 已提交
869
      } else {
870
        // error log
871
        do {
M
Minghao Li 已提交
872 873
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
874 875 876 877 878 879
          char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
880
      }
881 882 883 884 885 886 887 888 889
    } else {
      // error log
      do {
        char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);

      return -1;
890
    }
M
Minghao Li 已提交
891
  } else {
892 893 894 895 896 897 898 899
    // error log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);

    return -1;
900 901 902
  }

  return 0;
903
}