syncSnapshot.c 30.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
//----------------------------------
25 26 27 28 29
static void    snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
static void    snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
static void    snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
static void    snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
M
Minghao Li 已提交
30

31
//----------------------------------
M
Minghao Li 已提交
32
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
33 34
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
35

M
Minghao Li 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
52
    pSender->privateTerm = taosGetTimestampMs() + 100;
53
    pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
54
    pSender->finish = false;
M
Minghao Li 已提交
55
  } else {
56
    sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
M
Minghao Li 已提交
57
  }
58

M
Minghao Li 已提交
59 60
  return pSender;
}
M
Minghao Li 已提交
61

M
Minghao Li 已提交
62 63
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
64
    // free current block
M
Minghao Li 已提交
65 66
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
67
      pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
68
    }
69 70 71 72

    // close reader
    if (pSender->pReader != NULL) {
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
73 74 75
      if (ret != 0) {
        syncNodeErrorLog(pSender->pSyncNode, "stop reader error");
      }
76 77 78 79
      pSender->pReader = NULL;
    }

    // free sender
M
Minghao Li 已提交
80 81 82
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
83

M
Minghao Li 已提交
84 85
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

86 87 88 89 90 91
// begin send snapshot by param, snapshot, pReader
//
// action:
// 1. assert reader not start
// 2. update state
// 3. send first snapshot block
92 93
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
                            void *pReader) {
M
Minghao Li 已提交
94
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
95

96
  // init snapshot, parm, reader
M
Minghao Li 已提交
97
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
98 99
  pSender->pReader = pReader;
  pSender->snapshot = snapshot;
100
  pSender->snapshotParam = snapshotParam;
M
Minghao Li 已提交
101

102
  // init current block
M
Minghao Li 已提交
103 104 105 106 107
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }
  pSender->blockLen = 0;

108 109
  // update term
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
110
  ++(pSender->privateTerm);  // increase private term
111 112 113 114 115 116 117 118

  // update state
  pSender->finish = false;
  pSender->start = true;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;

  // init last config
119
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
120 121
    int32_t         code = 0;
    SSyncRaftEntry *pEntry = NULL;
122 123
    bool            getLastConfig = false;

124 125
    code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                          pSender->snapshot.lastConfigIndex, &pEntry);
126
    if (code == 0 && pEntry != NULL) {
127 128 129 130 131 132 133 134 135 136 137 138 139
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pEntry, &rpcMsg);

      SSyncCfg lastConfig;
      int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
      ASSERT(ret == 0);
      pSender->lastConfig = lastConfig;
      getLastConfig = true;

      rpcFreeCont(rpcMsg.pCont);
      syncEntryDestory(pEntry);
    } else {
      if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
140
        sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
141 142 143 144 145
        pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
        getLastConfig = true;
      }
    }

146
    // last config not found in wal, update to -1
147
    if (!getLastConfig) {
148 149 150
      SyncIndex oldLastConfigIndex = pSender->snapshot.lastConfigIndex;
      SyncIndex newLastConfigIndex = SYNC_INDEX_INVALID;
      pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
151
      memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
152 153 154 155

      // event log
      do {
        char logBuf[128];
156 157
        snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %" PRId64 " to %" PRId64,
                 oldLastConfigIndex, newLastConfigIndex);
158 159 160 161
        char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
        syncNodeEventLog(pSender->pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
162
    }
163 164

  } else {
165
    // no last config
166 167
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
168

M
Minghao Li 已提交
169
  // build begin msg
M
Minghao Li 已提交
170 171 172 173
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
174
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
175 176
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
177 178
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
179
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
180
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
181

M
Minghao Li 已提交
182
  // send msg
M
Minghao Li 已提交
183 184 185 186
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
  syncSnapshotSendDestroy(pMsg);
187 188 189

  // event log
  do {
190
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
191 192 193
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
194 195

  return 0;
M
Minghao Li 已提交
196 197
}

198
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
199
  // close reader
M
Minghao Li 已提交
200 201 202 203 204
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
205

206
  // free current block
M
Minghao Li 已提交
207 208
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
209
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
210 211
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
212

213
  // update flag
M
Minghao Li 已提交
214
  pSender->start = false;
215 216
  pSender->finish = finish;

217 218
  // do not update term, maybe print

219 220 221 222 223 224
  // event log
  do {
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
225 226

  return 0;
M
Minghao Li 已提交
227 228
}

229
// when sender receive ack, call this function to send msg from seq
M
Minghao Li 已提交
230
// seq = ack + 1, already updated
M
Minghao Li 已提交
231
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
232
  // free memory last time (current seq - 1)
M
Minghao Li 已提交
233 234
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
235
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
236 237 238 239 240 241 242
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
243 244 245
  if (pSender->blockLen > 0) {
    // has read data
  } else {
246
    // read finish, update seq to end
M
Minghao Li 已提交
247 248
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
249

M
Minghao Li 已提交
250
  // build msg
M
Minghao Li 已提交
251 252 253 254
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
255
  pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
256 257
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
258 259
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
260
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
261
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
262 263
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
264
  // send msg
M
Minghao Li 已提交
265 266 267
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
268
  syncSnapshotSendDestroy(pMsg);
M
Minghao Li 已提交
269

270 271 272 273 274 275 276 277
  // event log
  do {
    char *eventLog = NULL;
    if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender finish");
    } else {
      eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender sending");
    }
M
Minghao Li 已提交
278 279
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
280
  } while (0);
M
Minghao Li 已提交
281

M
Minghao Li 已提交
282 283 284
  return 0;
}

M
Minghao Li 已提交
285 286
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
287 288 289
  // send current block data
  if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
    // build msg
M
Minghao Li 已提交
290 291 292 293
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
294
    pMsg->beginIndex = pSender->snapshotParam.start;
M
Minghao Li 已提交
295 296
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
297 298
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
299
    pMsg->seq = pSender->seq;
300
    pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
301 302
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

303
    // send msg
M
Minghao Li 已提交
304 305 306 307
    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
    syncSnapshotSendDestroy(pMsg);
308 309 310 311 312 313 314

    // event log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender resend");
      syncNodeEventLog(pSender->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
M
Minghao Li 已提交
315
  }
316

M
Minghao Li 已提交
317 318 319
  return 0;
}

320 321 322 323 324 325
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
  ASSERT(pMsg->ack == pSender->seq);
  pSender->ack = pMsg->ack;
  ++(pSender->seq);
}

M
Minghao Li 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
S
Shengliang Guan 已提交
353
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
354
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
S
Shengliang Guan 已提交
355
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
356
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
357
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
S
Shengliang Guan 已提交
358
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS);
M
Minghao Li 已提交
359 360 361 362
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
S
Shengliang Guan 已提交
363
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term);
M
Minghao Li 已提交
364
    cJSON_AddStringToObject(pRoot, "term", u64buf);
S
Shengliang Guan 已提交
365
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm);
366
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
367
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
368 369 370 371 372 373 374 375 376
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
377
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
378 379 380 381
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
382 383
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
  int32_t len = 256;
384
  char   *s = taosMemoryMalloc(len);
M
Minghao Li 已提交
385 386

  SRaftId  destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
387
  char     host[64];
M
Minghao Li 已提交
388 389 390
  uint16_t port;
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);

M
Minghao Li 已提交
391
  snprintf(s, len,
392 393 394
           "%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
           " seq:%d ack:%d finish:%d pterm:%" PRIu64
           " "
395 396 397 398
           "replica-index:%d %s:%d}",
           event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
           pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
           pSender->finish, pSender->privateTerm, pSender->replicaIndex, host, port);
M
Minghao Li 已提交
399 400 401 402

  return s;
}

M
Minghao Li 已提交
403
// -------------------------------------
404
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
405 406
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
407

408
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
409 410 411 412
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
413

M
Minghao Li 已提交
414 415 416 417
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
418
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
419
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
420
    pReceiver->privateTerm = 0;
M
Minghao Li 已提交
421
    pReceiver->snapshot.data = NULL;
422
    pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
423
    pReceiver->snapshot.lastApplyTerm = 0;
424
    pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
425

M
Minghao Li 已提交
426
  } else {
427
    sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
M
Minghao Li 已提交
428
  }
429 430 431

  return pReceiver;
}
M
Minghao Li 已提交
432

433 434
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
435 436
    // close writer
    if (pReceiver->pWriter != NULL) {
437 438
      int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                    false, &(pReceiver->snapshot));
439 440 441 442 443
      ASSERT(ret == 0);
      pReceiver->pWriter = NULL;
    }

    // free receiver
444 445 446
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
447

448 449
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

450
// static do start by privateTerm, pBeginMsg
451
// receive first snapshot data
452
// write first block data
453
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
454
  // update state
M
Minghao Li 已提交
455
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
456
  pReceiver->privateTerm = pBeginMsg->privateTerm;
M
Minghao Li 已提交
457
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
M
Minghao Li 已提交
458
  pReceiver->fromId = pBeginMsg->srcId;
459
  pReceiver->start = true;
M
Minghao Li 已提交
460

461
  // update snapshot
M
Minghao Li 已提交
462 463 464
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
465 466
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
M
Minghao Li 已提交
467

468
  // start writer
M
Minghao Li 已提交
469
  ASSERT(pReceiver->pWriter == NULL);
470 471
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
                                                                 &(pReceiver->snapshotParam), &(pReceiver->pWriter));
M
Minghao Li 已提交
472
  ASSERT(ret == 0);
473 474 475 476 477 478 479

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
M
Minghao Li 已提交
480 481
}

482
// force stop
483 484 485
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
  // force close, abandon incomplete data
  if (pReceiver->pWriter != NULL) {
486 487
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &(pReceiver->snapshot));
488 489 490 491 492
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
  }

  pReceiver->start = false;
493 494 495 496 497 498 499

  // event log
  do {
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
500 501
}

M
Minghao Li 已提交
502 503
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
504
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
505
  if (!snapshotReceiverIsStart(pReceiver)) {
506
    // first start
507
    snapshotReceiverDoStart(pReceiver, pBeginMsg);
M
Minghao Li 已提交
508

509
  } else {
M
Minghao Li 已提交
510
    // already start
511
    sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
M
Minghao Li 已提交
512 513

    // force close, abandon incomplete data
514
    snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
515 516

    // start again
517
    snapshotReceiverDoStart(pReceiver, pBeginMsg);
518
  }
519 520

  return 0;
521
}
M
Minghao Li 已提交
522

523 524
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
525
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
M
Minghao Li 已提交
526
  if (pReceiver->pWriter != NULL) {
527 528
    int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
                                                                  &(pReceiver->snapshot));
M
Minghao Li 已提交
529 530
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
531
  }
M
Minghao Li 已提交
532 533

  pReceiver->start = false;
534

535 536 537 538 539 540 541 542
  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
543 544

  return 0;
545 546
}

547
// when recv last snapshot block, apply data into snapshot
548
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
549 550
  ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);

551
  int32_t code = 0;
552
  if (pReceiver->pWriter != NULL) {
553
    // write data
554 555 556
    if (pMsg->dataLen > 0) {
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
                                                           pMsg->dataLen);
557 558 559 560 561 562 563 564 565 566 567 568
      if (code != 0) {
        syncNodeErrorLog(pReceiver->pSyncNode, "snapshot write error");
        return -1;
      }
    }

    // reset wal
    code =
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "wal restore from snapshot error");
      return -1;
569 570
    }

571 572 573 574 575
    // update commit index
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
    }

576
    // stop writer, apply data
577 578
    code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
                                                           &(pReceiver->snapshot));
579 580 581 582 583
    if (code != 0) {
      syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
      ASSERT(0);
      return -1;
    }
584 585
    pReceiver->pWriter = NULL;

586 587
    // update progress
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
588

589 590 591
  } else {
    syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
    return -1;
592 593 594 595 596 597 598 599 600 601
  }

  // event log
  do {
    SSnapshot snapshot;
    pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
    char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
    syncNodeEventLog(pReceiver->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
  } while (0);
602 603

  return 0;
604 605
}

606 607
// apply data block
// update progress
608 609 610 611 612
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
  ASSERT(pMsg->seq == pReceiver->ack + 1);

  if (pReceiver->pWriter != NULL) {
    if (pMsg->dataLen > 0) {
613
      // apply data block
614 615 616 617
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
                                                                   pMsg->data, pMsg->dataLen);
      ASSERT(code == 0);
    }
618 619

    // update progress
620 621 622 623 624 625 626 627
    pReceiver->ack = pMsg->seq;

    // event log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
      syncNodeEventLog(pReceiver->pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
628
  }
629 630 631 632 633
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
634

635 636 637
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
638

639 640 641 642 643
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
644 645

    cJSON *pFromId = cJSON_CreateObject();
S
Shengliang Guan 已提交
646
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr);
647 648 649
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
650
      cJSON   *pTmp = pFromId;
651 652 653 654 655 656 657 658 659
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

S
Shengliang Guan 已提交
660
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex);
M
Minghao Li 已提交
661 662
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);

S
Shengliang Guan 已提交
663
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm);
M
Minghao Li 已提交
664 665
    cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);

S
Shengliang Guan 已提交
666
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex);
M
Minghao Li 已提交
667 668
    cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);

S
Shengliang Guan 已提交
669
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term);
670
    cJSON_AddStringToObject(pRoot, "term", u64buf);
671

S
Shengliang Guan 已提交
672
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->privateTerm);
673
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
674 675 676 677 678 679 680 681 682
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
683
  char  *serialized = cJSON_Print(pJson);
684 685 686
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
687

M
Minghao Li 已提交
688 689
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
  int32_t len = 256;
690
  char   *s = taosMemoryMalloc(len);
M
Minghao Li 已提交
691 692 693 694 695 696

  SRaftId  fromId = pReceiver->fromId;
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);

697
  snprintf(s, len,
698 699 700
           "%s {%p start:%d ack:%d term:%" PRIu64 " pterm:%" PRIu64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64
           " laindex:%" PRId64 " laterm:%" PRIu64
           " "
S
Shengliang Guan 已提交
701
           "lcindex:%" PRId64 "}",
702 703 704
           event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
           pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex,
           pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
M
Minghao Li 已提交
705 706 707 708

  return s;
}

709 710 711 712 713 714
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm
// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
// condition 4, got data, update ack
M
Minghao Li 已提交
715
//
M
Minghao Li 已提交
716
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
717
  // get receiver
718 719
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
720
  int32_t                code = 0;
M
Minghao Li 已提交
721

722 723
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
724 725
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
726
        // condition 1
727
        // begin, no data
728
        snapshotReceiverStart(pReceiver, pMsg);
M
Minghao Li 已提交
729 730 731
        needRsp = true;

      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
732
        // condition 2
M
Minghao Li 已提交
733
        // end, finish FSM
734 735 736 737
        code = snapshotReceiverFinish(pReceiver, pMsg);
        if (code == 0) {
          snapshotReceiverStop(pReceiver);
        }
738
        needRsp = true;
739

740 741
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
742
          SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
743

744 745 746
          // update new config myIndex
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
M
Minghao Li 已提交
747 748 749

          // do config change
          syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
750 751
        }

M
Minghao Li 已提交
752
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
753
        // condition 3
754 755
        // force close
        snapshotReceiverForceStop(pReceiver);
M
Minghao Li 已提交
756 757 758
        needRsp = false;

      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
759
        // condition 4
M
Minghao Li 已提交
760 761
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
762
          snapshotReceiverGotData(pReceiver, pMsg);
M
Minghao Li 已提交
763 764 765 766
        }
        needRsp = true;

      } else {
M
Minghao Li 已提交
767 768 769 770 771 772 773 774 775 776
        // error log
        do {
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
777
      }
M
Minghao Li 已提交
778

779
      // send ack
M
Minghao Li 已提交
780
      if (needRsp) {
781
        // build msg
M
Minghao Li 已提交
782 783 784 785 786 787
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
788 789 790
        pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
        pRspMsg->code = 0;
        pRspMsg->privateTerm = pReceiver->privateTerm;  // receiver maybe already closed
M
Minghao Li 已提交
791

792
        // send msg
M
Minghao Li 已提交
793 794 795 796 797
        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
        syncSnapshotRspDestroy(pRspMsg);
      }
798

799 800 801 802 803 804 805
    } else {
      // error log
      do {
        char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);
806 807

      return -1;
M
Minghao Li 已提交
808
    }
M
Minghao Li 已提交
809
  } else {
810 811 812 813 814 815
    // error log
    do {
      char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);
816 817

    return -1;
M
Minghao Li 已提交
818
  }
M
Minghao Li 已提交
819

M
Minghao Li 已提交
820 821 822
  return 0;
}

823 824 825 826 827 828
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
829
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
830 831
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
832 833
    sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
    return -1;
834 835
  }

M
Minghao Li 已提交
836
  // get sender
837
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
838 839 840 841 842
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
843 844
      // condition 1
      // receive ack is finish, close sender
M
Minghao Li 已提交
845
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
846
        snapshotSenderStop(pSender, true);
M
Minghao Li 已提交
847 848 849
        return 0;
      }

850
      // condition 2
M
Minghao Li 已提交
851
      // send next msg
852
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
853
        // update sender ack
854
        snapshotSenderUpdateProgress(pSender, pMsg);
M
Minghao Li 已提交
855
        snapshotSend(pSender);
856

M
Minghao Li 已提交
857
      } else if (pMsg->ack == pSender->seq - 1) {
858
        // maybe resend
M
Minghao Li 已提交
859
        snapshotReSend(pSender);
860

M
Minghao Li 已提交
861
      } else {
862
        // error log
863
        do {
M
Minghao Li 已提交
864 865
          char logBuf[96];
          snprintf(logBuf, sizeof(logBuf), "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
866 867 868 869 870 871
          char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
          syncNodeErrorLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);

        return -1;
872
      }
873 874 875 876 877 878 879 880 881
    } else {
      // error log
      do {
        char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
        syncNodeErrorLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      } while (0);

      return -1;
882
    }
M
Minghao Li 已提交
883
  } else {
884 885 886 887 888 889 890 891
    // error log
    do {
      char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
      syncNodeErrorLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    } while (0);

    return -1;
892 893 894
  }

  return 0;
895
}