syncSnapshot.c 29.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
M
Minghao Li 已提交
25

26
//----------------------------------
M
Minghao Li 已提交
27
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
28 29
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
30

M
Minghao Li 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
47
    pSender->privateTerm = taosGetTimestampMs() + 100;
M
Minghao Li 已提交
48
    pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
49
    pSender->finish = false;
M
Minghao Li 已提交
50
  } else {
M
Minghao Li 已提交
51
    sError("snapshotSenderCreate cannot create sender");
M
Minghao Li 已提交
52
  }
M
Minghao Li 已提交
53 54
  return pSender;
}
M
Minghao Li 已提交
55

M
Minghao Li 已提交
56 57
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
M
Minghao Li 已提交
58 59 60
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
    }
M
Minghao Li 已提交
61 62 63
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
64

M
Minghao Li 已提交
65 66
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
67
// begin send snapshot (current term, seq begin)
M
Minghao Li 已提交
68 69
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71 72
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
  // open snapshot reader
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
76 77 78
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
  ASSERT(ret == 0);

M
Minghao Li 已提交
79 80 81 82 83 84
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }

  pSender->blockLen = 0;

M
Minghao Li 已提交
85
  // get current snapshot info
M
Minghao Li 已提交
86
  pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
87
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
88
    /*
89
    SSyncRaftEntry *pEntry = NULL;
90 91
    int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                                  pSender->snapshot.lastConfigIndex, &pEntry);
92
    ASSERT(code == 0);
93 94 95 96 97 98
    ASSERT(pEntry != NULL);
    */

    SSyncRaftEntry *pEntry =
        pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex);
    ASSERT(pEntry != NULL);
99 100 101 102 103 104 105 106 107 108 109 110 111 112

    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);
    SSyncCfg lastConfig;
    int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
    ASSERT(ret == 0);
    pSender->lastConfig = lastConfig;

    rpcFreeCont(rpcMsg.pCont);
    syncEntryDestory(pEntry);

  } else {
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
113

M
Minghao Li 已提交
114 115 116
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  ++(pSender->privateTerm);
117
  pSender->finish = false;
M
Minghao Li 已提交
118 119
  pSender->start = true;

M
Minghao Li 已提交
120
  // build begin msg
M
Minghao Li 已提交
121 122 123 124 125 126
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
127 128
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
129
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
130
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
131

M
Minghao Li 已提交
132
  // send msg
M
Minghao Li 已提交
133 134 135
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
136

M
Minghao Li 已提交
137 138 139
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
140 141 142

  if (gRaftDetailLog) {
    char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
143 144
    sDebug(
        "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
145
        "lastConfigIndex:%ld send "
146 147
        "msg:%s",
        pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
148
        pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
149 150
    taosMemoryFree(msgStr);
  } else {
M
Minghao Li 已提交
151 152
    sDebug(
        "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
153 154 155
        "lastConfigIndex:%ld",
        pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
        pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
156
  }
M
Minghao Li 已提交
157

M
Minghao Li 已提交
158 159 160
  syncSnapshotSendDestroy(pMsg);
}

161
#if 0
M
Minghao Li 已提交
162
// when entry in snapshot, start sender
M
Minghao Li 已提交
163 164
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  if (!(pSender->start)) {
M
Minghao Li 已提交
165
    // start
M
Minghao Li 已提交
166 167 168
    snapshotSenderDoStart(pSender);
    pSender->start = true;
  } else {
M
Minghao Li 已提交
169
    // already start
M
Minghao Li 已提交
170 171
    ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term);

M
Minghao Li 已提交
172
    // if current term is higher, need start again
M
Minghao Li 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185
    if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) {
      // force peer rollback
      SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
      pMsg->srcId = pSender->pSyncNode->myRaftId;
      pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
      pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
      pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
      pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
      pMsg->seq = SYNC_SNAPSHOT_SEQ_FORCE_CLOSE;

      SRpcMsg rpcMsg;
      syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
      syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
186 187

      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
188
      sTrace("snapshot send force close seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
189 190
      taosMemoryFree(msgStr);

M
Minghao Li 已提交
191 192 193 194 195
      syncSnapshotSendDestroy(pMsg);

      // close reader
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
      ASSERT(ret == 0);
M
Minghao Li 已提交
196
      pSender->pReader = NULL;
M
Minghao Li 已提交
197 198 199

      // start again
      snapshotSenderDoStart(pSender);
M
Minghao Li 已提交
200
      pSender->start = true;
M
Minghao Li 已提交
201
    } else {
M
Minghao Li 已提交
202 203
      // current term, do nothing
      ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term);
M
Minghao Li 已提交
204 205
    }
  }
M
Minghao Li 已提交
206 207 208 209

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStart %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
210
}
211
#endif
M
Minghao Li 已提交
212 213

void snapshotSenderStop(SSyncSnapshotSender *pSender) {
M
Minghao Li 已提交
214 215 216 217 218
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
219 220 221

  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
222
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
223 224
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
225 226

  pSender->start = false;
M
Minghao Li 已提交
227

228 229 230 231 232
  if (gRaftDetailLog) {
    char *s = snapshotSender2Str(pSender);
    sInfo("snapshotSenderStop %s", s);
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
233 234
}

M
Minghao Li 已提交
235 236
// when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
M
Minghao Li 已提交
237 238 239 240
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
  // free memory last time (seq - 1)
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
241
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
242 243 244 245 246 247 248
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
249 250 251 252 253 254
  if (pSender->blockLen > 0) {
    // has read data
  } else {
    // read finish
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
255

M
Minghao Li 已提交
256
  // build msg
M
Minghao Li 已提交
257 258 259 260 261 262
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
263 264
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
265
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
266
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
267 268
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
269
  // send msg
M
Minghao Li 已提交
270 271 272
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
273

M
Minghao Li 已提交
274 275 276
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
277

M
Minghao Li 已提交
278
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
279 280
    if (gRaftDetailLog) {
      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
281 282
      sDebug(
          "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
283
          "lastConfigIndex:%ld send "
284 285
          "msg:%s",
          pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
286
          pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
287 288
      taosMemoryFree(msgStr);
    } else {
M
Minghao Li 已提交
289 290
      sDebug(
          "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
291 292 293
          "lastConfigIndex:%ld",
          pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
          pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
294
    }
M
Minghao Li 已提交
295
  } else {
M
Minghao Li 已提交
296 297
    sDebug(
        "vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
298 299 300
        "lastConfigIndex:%ld",
        pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
        pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
M
Minghao Li 已提交
301 302
  }

M
Minghao Li 已提交
303 304 305 306
  syncSnapshotSendDestroy(pMsg);
  return 0;
}

M
Minghao Li 已提交
307 308 309 310 311 312 313 314 315
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
  if (pSender->pCurrentBlock != NULL) {
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
316 317
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
318 319 320 321 322 323
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
324

M
Minghao Li 已提交
325 326 327
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
328 329 330

    if (gRaftDetailLog) {
      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
331
      sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId,
332 333 334
             host, port, pSender->seq, pSender->ack, msgStr);
      taosMemoryFree(msgStr);
    } else {
M
Minghao Li 已提交
335
      sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port,
336 337
             pSender->seq, pSender->ack);
    }
M
Minghao Li 已提交
338

M
Minghao Li 已提交
339 340 341 342 343
    syncSnapshotSendDestroy(pMsg);
  }
  return 0;
}

M
Minghao Li 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
372
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
373
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
374
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
375 376 377 378 379 380 381 382 383
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);

    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
384 385
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
386
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
387 388 389 390 391 392 393 394 395
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
396
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
397 398 399 400 401
  cJSON_Delete(pJson);
  return serialized;
}

// -------------------------------------
402
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
403 404
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
405

406
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
407 408 409 410
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
411

M
Minghao Li 已提交
412 413 414 415
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
416
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
417
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
418 419
    pReceiver->privateTerm = 0;

M
Minghao Li 已提交
420 421 422
  } else {
    sInfo("snapshotReceiverCreate cannot create receiver");
  }
423 424 425

  return pReceiver;
}
M
Minghao Li 已提交
426

427 428 429 430 431
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
432

433 434
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

M
Minghao Li 已提交
435
// begin receive snapshot msg (current term, seq begin)
436
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
M
Minghao Li 已提交
437
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
438
  pReceiver->privateTerm = privateTerm;
M
Minghao Li 已提交
439
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
440
  pReceiver->fromId = fromId;
M
Minghao Li 已提交
441 442 443 444 445 446 447 448

  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
}

// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
449
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
450
  if (!snapshotReceiverIsStart(pReceiver)) {
M
Minghao Li 已提交
451
    // start
452
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
453
    pReceiver->start = true;
M
Minghao Li 已提交
454

455
  } else {
M
Minghao Li 已提交
456
    // already start
457
    sInfo("snapshot recv, receiver already start");
M
Minghao Li 已提交
458 459 460 461 462 463 464 465

    // force close, abandon incomplete data
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;

    // start again
466
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
467
    pReceiver->start = true;
468
  }
M
Minghao Li 已提交
469

470 471 472 473 474
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStart %s", s);
    taosMemoryFree(s);
  }
475
}
M
Minghao Li 已提交
476

477
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
M
Minghao Li 已提交
478 479 480 481 482
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
483
  }
M
Minghao Li 已提交
484 485

  pReceiver->start = false;
486 487 488 489

  if (apply) {
    ++(pReceiver->privateTerm);
  }
M
Minghao Li 已提交
490

491 492 493 494 495
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStop %s", s);
    taosMemoryFree(s);
  }
496 497 498 499 500
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
501

502 503 504
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
505

506 507 508 509 510
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
511 512 513 514 515 516

    cJSON *pFromId = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
517
      cJSON   *pTmp = pFromId;
518 519 520 521 522 523 524 525 526
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

527 528
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
529 530 531

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
532 533 534 535 536 537 538 539 540
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
541
  char  *serialized = cJSON_Print(pJson);
542 543 544
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
545

546
// receiver do something
M
Minghao Li 已提交
547
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
548
  // get receiver
549 550
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
551
  int32_t                writeCode = 0;
M
Minghao Li 已提交
552

553 554
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
555 556 557
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        // begin
558
        snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId);
M
Minghao Li 已提交
559 560 561
        pReceiver->ack = pMsg->seq;
        needRsp = true;

M
Minghao Li 已提交
562 563 564
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
565 566 567

        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
568 569 570 571 572
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld, recv msg:%s",
              pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
              msgStr);
573 574
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
575 576 577 578
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld",
              pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
579
        }
M
Minghao Li 已提交
580

M
Minghao Li 已提交
581 582
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
583 584 585
        writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
        ASSERT(writeCode == 0);

M
Minghao Li 已提交
586
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
M
Minghao Li 已提交
587
        pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
588

589 590
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
591 592
          int32_t oldReplicaNum = pSyncNode->replicaNum;

593 594 595 596 597 598 599 600 601 602 603 604
          // update new config myIndex
          bool     IamInNew = false;
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
            if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
                pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
              newSyncCfg.myIndex = i;
              IamInNew = true;
              break;
            }
          }

605
          bool isDrop;
606
          if (IamInNew) {
M
Minghao Li 已提交
607
            sDebug("vgId:%d sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ",
608
                   pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
609 610
            syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
          } else {
M
Minghao Li 已提交
611 612
            sDebug(
                "vgId:%d sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, "
613 614 615 616 617 618
                "lastConfigIndex:%ld ",
                pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
          }

          // change isStandBy to normal
          if (!isDrop) {
619 620
            char tmpbuf[128];
            snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum);
621
            if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
622
              syncNodeBecomeLeader(pSyncNode, tmpbuf);
623
            } else {
624
              syncNodeBecomeFollower(pSyncNode, tmpbuf);
625
            }
626
          }
627 628
        }

M
Minghao Li 已提交
629 630
        SSnapshot snapshot;
        pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
631

M
Minghao Li 已提交
632 633 634
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
635 636 637

        if (gRaftDetailLog) {
          char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
M
Minghao Li 已提交
638 639
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
640
              "snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
641
              "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, raft log:%s",
642
              pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
M
Minghao Li 已提交
643
              snapshot.lastConfigIndex, logSimpleStr);
644 645
          taosMemoryFree(logSimpleStr);
        } else {
M
Minghao Li 已提交
646 647
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
648
              "snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
649 650 651
              "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld",
              pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
              snapshot.lastConfigIndex);
652
        }
M
Minghao Li 已提交
653

M
Minghao Li 已提交
654
        pReceiver->pWriter = NULL;
655
        snapshotReceiverStop(pReceiver, true);
M
Minghao Li 已提交
656
        pReceiver->ack = pMsg->seq;
M
Minghao Li 已提交
657
        needRsp = true;
M
Minghao Li 已提交
658

659 660
        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
661 662 663 664 665
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld, recv msg:%s",
              pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex, msgStr);
666 667
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
668 669 670 671 672
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld",
              pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex);
673
        }
M
Minghao Li 已提交
674

M
Minghao Li 已提交
675 676
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
677
        snapshotReceiverStop(pReceiver, false);
M
Minghao Li 已提交
678 679
        needRsp = false;

M
Minghao Li 已提交
680 681 682 683
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

684 685
        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
686 687 688
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld, recv "
689
              "msg:%s",
M
Minghao Li 已提交
690 691
              pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex, msgStr);
692 693
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
694 695 696 697 698
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld",
              pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex);
699
        }
M
Minghao Li 已提交
700

M
Minghao Li 已提交
701 702 703
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
704 705 706
          writeCode =
              pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
          ASSERT(writeCode == 0);
M
Minghao Li 已提交
707 708 709 710
          pReceiver->ack = pMsg->seq;
        }
        needRsp = true;

M
Minghao Li 已提交
711 712 713
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
714 715 716

        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
717 718 719 720 721
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld, recv msg:%s",
              pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
              msgStr);
722 723
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
724 725 726 727
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld",
              pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
728
        }
M
Minghao Li 已提交
729

M
Minghao Li 已提交
730 731
      } else {
        ASSERT(0);
732
      }
M
Minghao Li 已提交
733

M
Minghao Li 已提交
734 735 736 737 738 739 740 741
      if (needRsp) {
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
        pRspMsg->ack = pReceiver->ack;
742
        pRspMsg->code = writeCode;
M
Minghao Li 已提交
743
        pRspMsg->privateTerm = pReceiver->privateTerm;
M
Minghao Li 已提交
744 745 746 747

        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
M
Minghao Li 已提交
748

M
Minghao Li 已提交
749 750
        syncSnapshotRspDestroy(pRspMsg);
      }
M
Minghao Li 已提交
751
    }
M
Minghao Li 已提交
752 753
  } else {
    syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
M
Minghao Li 已提交
754
  }
M
Minghao Li 已提交
755

M
Minghao Li 已提交
756 757 758
  return 0;
}

M
Minghao Li 已提交
759 760
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
761
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
762 763 764 765 766 767
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    sInfo("recv SyncSnapshotRsp maybe replica already dropped");
    return 0;
  }

M
Minghao Li 已提交
768
  // get sender
769
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
770 771 772 773 774
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
775 776
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
777
        pSender->finish = true;
M
Minghao Li 已提交
778 779 780 781 782
        snapshotSenderStop(pSender);
        return 0;
      }

      // send next msg
783
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
784
        // update sender ack
785 786
        pSender->ack = pMsg->ack;
        (pSender->seq)++;
M
Minghao Li 已提交
787
        snapshotSend(pSender);
788

M
Minghao Li 已提交
789 790
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
791

M
Minghao Li 已提交
792 793
      } else {
        ASSERT(0);
794 795
      }
    }
M
Minghao Li 已提交
796 797
  } else {
    syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
798 799 800
  }

  return 0;
801
}