syncSnapshot.c 29.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
M
Minghao Li 已提交
25

26
//----------------------------------
M
Minghao Li 已提交
27
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
28 29
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
30

M
Minghao Li 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
47
    pSender->privateTerm = taosGetTimestampMs() + 100;
M
Minghao Li 已提交
48
    pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
49
    pSender->finish = false;
M
Minghao Li 已提交
50
  } else {
M
Minghao Li 已提交
51
    sError("snapshotSenderCreate cannot create sender");
M
Minghao Li 已提交
52
  }
M
Minghao Li 已提交
53 54
  return pSender;
}
M
Minghao Li 已提交
55

M
Minghao Li 已提交
56 57
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
M
Minghao Li 已提交
58 59 60
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
    }
M
Minghao Li 已提交
61 62 63
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
64

M
Minghao Li 已提交
65 66
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
67
// begin send snapshot (current term, seq begin)
M
Minghao Li 已提交
68 69
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71 72
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
  // open snapshot reader
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
76 77 78
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
  ASSERT(ret == 0);

M
Minghao Li 已提交
79 80 81 82 83 84
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }

  pSender->blockLen = 0;

M
Minghao Li 已提交
85
  // get current snapshot info
M
Minghao Li 已提交
86
  pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
87
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
88
    /*
89
    SSyncRaftEntry *pEntry = NULL;
90 91
    int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                                  pSender->snapshot.lastConfigIndex, &pEntry);
92
    ASSERT(code == 0);
93 94 95 96 97 98
    ASSERT(pEntry != NULL);
    */

    SSyncRaftEntry *pEntry =
        pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex);
    ASSERT(pEntry != NULL);
99 100 101 102 103 104 105 106 107 108 109 110 111 112

    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);
    SSyncCfg lastConfig;
    int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
    ASSERT(ret == 0);
    pSender->lastConfig = lastConfig;

    rpcFreeCont(rpcMsg.pCont);
    syncEntryDestory(pEntry);

  } else {
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
113

M
Minghao Li 已提交
114 115 116
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  ++(pSender->privateTerm);
117
  pSender->finish = false;
M
Minghao Li 已提交
118 119
  pSender->start = true;

M
Minghao Li 已提交
120
  // build begin msg
M
Minghao Li 已提交
121 122 123 124 125 126
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
127 128
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
129
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
130
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
131

M
Minghao Li 已提交
132
  // send msg
M
Minghao Li 已提交
133 134 135
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
136

M
Minghao Li 已提交
137 138 139
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
140 141 142

  if (gRaftDetailLog) {
    char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
143 144
    sDebug(
        "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
145
        "lastConfigIndex:%ld send "
146 147
        "msg:%s",
        pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
148
        pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
149 150
    taosMemoryFree(msgStr);
  } else {
M
Minghao Li 已提交
151 152
    sDebug(
        "vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
153 154 155
        "lastConfigIndex:%ld",
        pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
        pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
156
  }
M
Minghao Li 已提交
157

M
Minghao Li 已提交
158 159 160
  syncSnapshotSendDestroy(pMsg);
}

161
#if 0
M
Minghao Li 已提交
162
// when entry in snapshot, start sender
M
Minghao Li 已提交
163 164
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  if (!(pSender->start)) {
M
Minghao Li 已提交
165
    // start
M
Minghao Li 已提交
166 167 168
    snapshotSenderDoStart(pSender);
    pSender->start = true;
  } else {
M
Minghao Li 已提交
169
    // already start
M
Minghao Li 已提交
170 171
    ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term);

M
Minghao Li 已提交
172
    // if current term is higher, need start again
M
Minghao Li 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185
    if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) {
      // force peer rollback
      SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
      pMsg->srcId = pSender->pSyncNode->myRaftId;
      pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
      pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
      pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
      pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
      pMsg->seq = SYNC_SNAPSHOT_SEQ_FORCE_CLOSE;

      SRpcMsg rpcMsg;
      syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
      syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
186 187

      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
188
      sTrace("snapshot send force close seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
189 190
      taosMemoryFree(msgStr);

M
Minghao Li 已提交
191 192 193 194 195
      syncSnapshotSendDestroy(pMsg);

      // close reader
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
      ASSERT(ret == 0);
M
Minghao Li 已提交
196
      pSender->pReader = NULL;
M
Minghao Li 已提交
197 198 199

      // start again
      snapshotSenderDoStart(pSender);
M
Minghao Li 已提交
200
      pSender->start = true;
M
Minghao Li 已提交
201
    } else {
M
Minghao Li 已提交
202 203
      // current term, do nothing
      ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term);
M
Minghao Li 已提交
204 205
    }
  }
M
Minghao Li 已提交
206 207 208 209

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStart %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
210
}
211
#endif
M
Minghao Li 已提交
212 213

void snapshotSenderStop(SSyncSnapshotSender *pSender) {
M
Minghao Li 已提交
214 215 216 217 218
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
219 220 221

  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
222
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
223 224
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
225 226

  pSender->start = false;
M
Minghao Li 已提交
227

228 229 230 231 232
  if (gRaftDetailLog) {
    char *s = snapshotSender2Str(pSender);
    sInfo("snapshotSenderStop %s", s);
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
233 234
}

M
Minghao Li 已提交
235 236
// when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
M
Minghao Li 已提交
237 238 239 240
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
  // free memory last time (seq - 1)
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
241
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
242 243 244 245 246 247 248
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
249 250 251 252 253 254
  if (pSender->blockLen > 0) {
    // has read data
  } else {
    // read finish
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
255

M
Minghao Li 已提交
256
  // build msg
M
Minghao Li 已提交
257 258 259 260 261 262
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
263 264
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
265
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
266
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
267 268
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
269
  // send msg
M
Minghao Li 已提交
270 271 272
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
273

M
Minghao Li 已提交
274 275 276
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
277

M
Minghao Li 已提交
278
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
279 280
    if (gRaftDetailLog) {
      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
281 282
      sDebug(
          "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
283
          "lastConfigIndex:%ld send "
284 285
          "msg:%s",
          pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
286
          pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
287 288
      taosMemoryFree(msgStr);
    } else {
M
Minghao Li 已提交
289 290
      sDebug(
          "vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
291 292 293
          "lastConfigIndex:%ld",
          pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
          pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
294
    }
M
Minghao Li 已提交
295
  } else {
M
Minghao Li 已提交
296 297
    sDebug(
        "vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
298 299 300
        "lastConfigIndex:%ld",
        pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
        pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
M
Minghao Li 已提交
301 302
  }

M
Minghao Li 已提交
303 304 305 306
  syncSnapshotSendDestroy(pMsg);
  return 0;
}

M
Minghao Li 已提交
307 308 309 310 311 312 313 314 315
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
  if (pSender->pCurrentBlock != NULL) {
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
316 317
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
318 319 320 321 322 323
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
324

M
Minghao Li 已提交
325 326 327
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
328 329 330

    if (gRaftDetailLog) {
      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
331
      sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId,
332 333 334
             host, port, pSender->seq, pSender->ack, msgStr);
      taosMemoryFree(msgStr);
    } else {
M
Minghao Li 已提交
335
      sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port,
336 337
             pSender->seq, pSender->ack);
    }
M
Minghao Li 已提交
338

M
Minghao Li 已提交
339 340 341 342 343
    syncSnapshotSendDestroy(pMsg);
  }
  return 0;
}

M
Minghao Li 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
372
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
373
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
374
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
375 376 377 378 379 380 381 382 383
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);

    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
384 385
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
386
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
387 388 389 390 391 392 393 394 395
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
396
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
397 398 399 400 401
  cJSON_Delete(pJson);
  return serialized;
}

// -------------------------------------
402
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
403 404
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
405

406
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
407 408 409 410
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
411

M
Minghao Li 已提交
412 413 414 415
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
416
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
417
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
418 419
    pReceiver->privateTerm = 0;

M
Minghao Li 已提交
420 421 422
  } else {
    sInfo("snapshotReceiverCreate cannot create receiver");
  }
423 424 425

  return pReceiver;
}
M
Minghao Li 已提交
426

427 428 429 430 431
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
432

433 434
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

M
Minghao Li 已提交
435
// begin receive snapshot msg (current term, seq begin)
436
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
M
Minghao Li 已提交
437
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
438
  pReceiver->privateTerm = privateTerm;
M
Minghao Li 已提交
439
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
440
  pReceiver->fromId = fromId;
M
Minghao Li 已提交
441 442 443 444 445 446 447 448

  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
}

// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
449
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
450
  if (!snapshotReceiverIsStart(pReceiver)) {
M
Minghao Li 已提交
451
    // start
452
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
453
    pReceiver->start = true;
M
Minghao Li 已提交
454

455
  } else {
M
Minghao Li 已提交
456
    // already start
457
    sInfo("snapshot recv, receiver already start");
M
Minghao Li 已提交
458 459 460 461 462 463 464 465

    // force close, abandon incomplete data
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;

    // start again
466
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
467
    pReceiver->start = true;
468
  }
M
Minghao Li 已提交
469

470 471 472 473 474
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStart %s", s);
    taosMemoryFree(s);
  }
475
}
M
Minghao Li 已提交
476

477
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
M
Minghao Li 已提交
478 479 480 481 482
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
483
  }
M
Minghao Li 已提交
484 485

  pReceiver->start = false;
486 487 488 489

  if (apply) {
    ++(pReceiver->privateTerm);
  }
M
Minghao Li 已提交
490

491 492 493 494 495
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStop %s", s);
    taosMemoryFree(s);
  }
496 497 498 499 500
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
501

502 503 504
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
505

506 507 508 509 510
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
511 512 513 514 515 516

    cJSON *pFromId = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
517
      cJSON *  pTmp = pFromId;
518 519 520 521 522 523 524 525 526
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

527 528
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
529 530 531

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
532 533 534 535 536 537 538 539 540
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
541
  char * serialized = cJSON_Print(pJson);
542 543 544
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
545

546
// receiver do something
M
Minghao Li 已提交
547
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
548
  // get receiver
549 550
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
551
  int32_t                writeCode = 0;
M
Minghao Li 已提交
552

553 554
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
555 556 557
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        // begin
558
        snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId);
M
Minghao Li 已提交
559 560 561
        pReceiver->ack = pMsg->seq;
        needRsp = true;

M
Minghao Li 已提交
562 563 564
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
565 566 567

        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
568 569 570 571 572
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld, recv msg:%s",
              pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
              msgStr);
573 574
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
575 576 577 578
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld",
              pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
579
        }
M
Minghao Li 已提交
580

M
Minghao Li 已提交
581 582
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
583 584 585
        writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
        ASSERT(writeCode == 0);

M
Minghao Li 已提交
586
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
M
Minghao Li 已提交
587
        pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
588

589 590
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
591 592 593 594 595 596 597 598 599 600 601 602
          // update new config myIndex
          bool     IamInNew = false;
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
            if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
                pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
              newSyncCfg.myIndex = i;
              IamInNew = true;
              break;
            }
          }

603
          bool isDrop;
604
          if (IamInNew) {
M
Minghao Li 已提交
605
            sDebug("vgId:%d sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ",
606
                   pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
607 608
            syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
          } else {
M
Minghao Li 已提交
609 610
            sDebug(
                "vgId:%d sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, "
611 612 613 614 615 616 617 618 619 620 621
                "lastConfigIndex:%ld ",
                pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
          }

          // change isStandBy to normal
          if (!isDrop) {
            if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
              syncNodeBecomeLeader(pSyncNode, "config change");
            } else {
              syncNodeBecomeFollower(pSyncNode, "config change");
            }
622
          }
623 624
        }

M
Minghao Li 已提交
625 626
        SSnapshot snapshot;
        pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
627

M
Minghao Li 已提交
628 629 630
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
631 632 633

        if (gRaftDetailLog) {
          char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
M
Minghao Li 已提交
634 635
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
636
              "snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
637
              "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, raft log:%s",
638
              pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
M
Minghao Li 已提交
639
              snapshot.lastConfigIndex, logSimpleStr);
640 641
          taosMemoryFree(logSimpleStr);
        } else {
M
Minghao Li 已提交
642 643
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
644
              "snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
645 646 647
              "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld",
              pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
              snapshot.lastConfigIndex);
648
        }
M
Minghao Li 已提交
649

M
Minghao Li 已提交
650
        pReceiver->pWriter = NULL;
651
        snapshotReceiverStop(pReceiver, true);
M
Minghao Li 已提交
652
        pReceiver->ack = pMsg->seq;
M
Minghao Li 已提交
653
        needRsp = true;
M
Minghao Li 已提交
654

655 656
        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
657 658 659 660 661
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld, recv msg:%s",
              pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex, msgStr);
662 663
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
664 665 666 667 668
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld",
              pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex);
669
        }
M
Minghao Li 已提交
670

M
Minghao Li 已提交
671 672
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
673
        snapshotReceiverStop(pReceiver, false);
M
Minghao Li 已提交
674 675
        needRsp = false;

M
Minghao Li 已提交
676 677 678 679
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

680 681
        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
682 683 684
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld, recv "
685
              "msg:%s",
M
Minghao Li 已提交
686 687
              pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex, msgStr);
688 689
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
690 691 692 693 694
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld",
              pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
              pMsg->lastConfigIndex);
695
        }
M
Minghao Li 已提交
696

M
Minghao Li 已提交
697 698 699
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
700 701 702
          writeCode =
              pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
          ASSERT(writeCode == 0);
M
Minghao Li 已提交
703 704 705 706
          pReceiver->ack = pMsg->seq;
        }
        needRsp = true;

M
Minghao Li 已提交
707 708 709
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
710 711 712

        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
713 714 715 716 717
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld, recv msg:%s",
              pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
              msgStr);
718 719
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
720 721 722 723
          sDebug(
              "vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
              "lastConfigIndex:%ld",
              pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
724
        }
M
Minghao Li 已提交
725

M
Minghao Li 已提交
726 727
      } else {
        ASSERT(0);
728
      }
M
Minghao Li 已提交
729

M
Minghao Li 已提交
730 731 732 733 734 735 736 737
      if (needRsp) {
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
        pRspMsg->ack = pReceiver->ack;
738
        pRspMsg->code = writeCode;
M
Minghao Li 已提交
739
        pRspMsg->privateTerm = pReceiver->privateTerm;
M
Minghao Li 已提交
740 741 742 743

        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
M
Minghao Li 已提交
744

M
Minghao Li 已提交
745 746
        syncSnapshotRspDestroy(pRspMsg);
      }
M
Minghao Li 已提交
747
    }
M
Minghao Li 已提交
748 749
  } else {
    syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
M
Minghao Li 已提交
750
  }
M
Minghao Li 已提交
751

M
Minghao Li 已提交
752 753 754
  return 0;
}

M
Minghao Li 已提交
755 756
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
757
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
758 759 760 761 762 763
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    sInfo("recv SyncSnapshotRsp maybe replica already dropped");
    return 0;
  }

M
Minghao Li 已提交
764
  // get sender
765
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
766 767 768 769 770
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
771 772
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
773
        pSender->finish = true;
M
Minghao Li 已提交
774 775 776 777 778
        snapshotSenderStop(pSender);
        return 0;
      }

      // send next msg
779
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
780
        // update sender ack
781 782
        pSender->ack = pMsg->ack;
        (pSender->seq)++;
M
Minghao Li 已提交
783
        snapshotSend(pSender);
784

M
Minghao Li 已提交
785 786
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
787

M
Minghao Li 已提交
788 789
      } else {
        ASSERT(0);
790 791
      }
    }
M
Minghao Li 已提交
792 793
  } else {
    syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
794 795 796
  }

  return 0;
797
}