syncSnapshot.c 33.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
M
Minghao Li 已提交
25

26
//----------------------------------
M
Minghao Li 已提交
27
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
28 29
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
30

M
Minghao Li 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
47
    pSender->privateTerm = taosGetTimestampMs() + 100;
M
Minghao Li 已提交
48
    pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
49
    pSender->finish = false;
M
Minghao Li 已提交
50
  } else {
M
Minghao Li 已提交
51
    sError("snapshotSenderCreate cannot create sender");
M
Minghao Li 已提交
52
  }
M
Minghao Li 已提交
53 54
  return pSender;
}
M
Minghao Li 已提交
55

M
Minghao Li 已提交
56 57
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
M
Minghao Li 已提交
58 59 60
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
    }
M
Minghao Li 已提交
61 62 63
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
64

M
Minghao Li 已提交
65 66
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
67
// begin send snapshot (current term, seq begin)
M
Minghao Li 已提交
68 69
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71 72
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75
  // open snapshot reader
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
76 77 78
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
  ASSERT(ret == 0);

M
Minghao Li 已提交
79 80 81 82 83 84
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }

  pSender->blockLen = 0;

M
Minghao Li 已提交
85
  // get current snapshot info
M
Minghao Li 已提交
86
  pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
87
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
88
    /*
89
    SSyncRaftEntry *pEntry = NULL;
90 91
    int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                                  pSender->snapshot.lastConfigIndex, &pEntry);
92
    ASSERT(code == 0);
93 94 95 96 97 98
    ASSERT(pEntry != NULL);
    */

    SSyncRaftEntry *pEntry =
        pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex);
    ASSERT(pEntry != NULL);
99 100 101 102 103 104 105 106 107 108 109 110 111 112

    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);
    SSyncCfg lastConfig;
    int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
    ASSERT(ret == 0);
    pSender->lastConfig = lastConfig;

    rpcFreeCont(rpcMsg.pCont);
    syncEntryDestory(pEntry);

  } else {
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
113

M
Minghao Li 已提交
114 115 116
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  ++(pSender->privateTerm);
117
  pSender->finish = false;
M
Minghao Li 已提交
118 119
  pSender->start = true;

M
Minghao Li 已提交
120
  // build begin msg
M
Minghao Li 已提交
121 122 123 124 125 126
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
127 128
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
129
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
130
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
131

M
Minghao Li 已提交
132
  // send msg
M
Minghao Li 已提交
133 134 135
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
136

M
Minghao Li 已提交
137 138 139
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
140 141 142

  if (gRaftDetailLog) {
    char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
143
    sDebug(
M
Minghao Li 已提交
144
        "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
145
        "lastApplyTerm:%lu "
M
Minghao Li 已提交
146
        "lastConfigIndex:%ld privateTerm:%lu send "
147
        "msg:%s",
M
Minghao Li 已提交
148 149
        pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
        pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
150 151
        pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
        pSender->privateTerm, msgStr);
152 153
    taosMemoryFree(msgStr);
  } else {
M
Minghao Li 已提交
154
    sDebug(
M
Minghao Li 已提交
155
        "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
156
        "lastApplyTerm:%lu "
M
Minghao Li 已提交
157
        "lastConfigIndex:%ld privateTerm:%lu",
M
Minghao Li 已提交
158 159
        pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
        pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
160 161
        pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
        pSender->privateTerm);
162
  }
M
Minghao Li 已提交
163

M
Minghao Li 已提交
164 165 166
  syncSnapshotSendDestroy(pMsg);
}

167
#if 0
M
Minghao Li 已提交
168
// when entry in snapshot, start sender
M
Minghao Li 已提交
169 170
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  if (!(pSender->start)) {
M
Minghao Li 已提交
171
    // start
M
Minghao Li 已提交
172 173 174
    snapshotSenderDoStart(pSender);
    pSender->start = true;
  } else {
M
Minghao Li 已提交
175
    // already start
M
Minghao Li 已提交
176 177
    ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term);

M
Minghao Li 已提交
178
    // if current term is higher, need start again
M
Minghao Li 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191
    if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) {
      // force peer rollback
      SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
      pMsg->srcId = pSender->pSyncNode->myRaftId;
      pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
      pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
      pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
      pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
      pMsg->seq = SYNC_SNAPSHOT_SEQ_FORCE_CLOSE;

      SRpcMsg rpcMsg;
      syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
      syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
192 193

      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
194
      sTrace("snapshot send force close seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
195 196
      taosMemoryFree(msgStr);

M
Minghao Li 已提交
197 198 199 200 201
      syncSnapshotSendDestroy(pMsg);

      // close reader
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
      ASSERT(ret == 0);
M
Minghao Li 已提交
202
      pSender->pReader = NULL;
M
Minghao Li 已提交
203 204 205

      // start again
      snapshotSenderDoStart(pSender);
M
Minghao Li 已提交
206
      pSender->start = true;
M
Minghao Li 已提交
207
    } else {
M
Minghao Li 已提交
208 209
      // current term, do nothing
      ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term);
M
Minghao Li 已提交
210 211
    }
  }
M
Minghao Li 已提交
212 213 214 215

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStart %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
216
}
217
#endif
M
Minghao Li 已提交
218 219

void snapshotSenderStop(SSyncSnapshotSender *pSender) {
M
Minghao Li 已提交
220 221 222 223 224
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
225 226 227

  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
228
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
229 230
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
231 232

  pSender->start = false;
M
Minghao Li 已提交
233

234 235 236 237 238
  if (gRaftDetailLog) {
    char *s = snapshotSender2Str(pSender);
    sInfo("snapshotSenderStop %s", s);
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
239 240
}

M
Minghao Li 已提交
241 242
// when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
M
Minghao Li 已提交
243 244 245 246
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
  // free memory last time (seq - 1)
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
247
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
248 249 250 251 252 253 254
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
255 256 257 258 259 260
  if (pSender->blockLen > 0) {
    // has read data
  } else {
    // read finish
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
261

M
Minghao Li 已提交
262
  // build msg
M
Minghao Li 已提交
263 264 265 266 267 268
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
269 270
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
271
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
272
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
273 274
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
275
  // send msg
M
Minghao Li 已提交
276 277 278
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
279

M
Minghao Li 已提交
280 281 282
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
283

M
Minghao Li 已提交
284
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
285 286
    if (gRaftDetailLog) {
      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
287
      sDebug(
M
Minghao Li 已提交
288
          "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
289
          "lastApplyTerm:%lu "
M
Minghao Li 已提交
290
          "lastConfigIndex:%ld privateTerm:%lu send "
291
          "msg:%s",
M
Minghao Li 已提交
292 293
          pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
          pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
294 295
          pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
          pSender->privateTerm, msgStr);
296 297
      taosMemoryFree(msgStr);
    } else {
M
Minghao Li 已提交
298
      sDebug(
M
Minghao Li 已提交
299
          "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
300
          "lastApplyTerm:%lu "
M
Minghao Li 已提交
301
          "lastConfigIndex:%ld privateTerm:%lu",
M
Minghao Li 已提交
302 303
          pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
          pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
304 305
          pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
          pSender->privateTerm);
306
    }
M
Minghao Li 已提交
307
  } else {
M
Minghao Li 已提交
308
    sDebug(
M
Minghao Li 已提交
309
        "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld "
310
        "lastApplyTerm:%lu "
M
Minghao Li 已提交
311
        "lastConfigIndex:%ld privateTerm:%lu",
M
Minghao Li 已提交
312 313
        pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
        pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
314 315
        pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
        pSender->privateTerm);
M
Minghao Li 已提交
316 317
  }

M
Minghao Li 已提交
318 319 320 321
  syncSnapshotSendDestroy(pMsg);
  return 0;
}

M
Minghao Li 已提交
322 323 324 325 326 327 328 329 330
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
  if (pSender->pCurrentBlock != NULL) {
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
331 332
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
333 334 335 336 337 338
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
339

M
Minghao Li 已提交
340 341 342
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
343 344 345

    if (gRaftDetailLog) {
      char *msgStr = syncSnapshotSend2Str(pMsg);
346
      sDebug(
M
Minghao Li 已提交
347 348 349 350 351
          "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send "
          "msg:%s",
          pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
          pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm,
          msgStr);
352 353
      taosMemoryFree(msgStr);
    } else {
M
Minghao Li 已提交
354 355 356
      sDebug("vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu",
             pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
             pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm);
357
    }
M
Minghao Li 已提交
358

M
Minghao Li 已提交
359 360 361 362 363
    syncSnapshotSendDestroy(pMsg);
  }
  return 0;
}

M
Minghao Li 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
392
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
393
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
394
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
395 396 397 398 399 400 401 402 403
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);

    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
404 405
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
406
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
407 408 409 410 411 412 413 414 415
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
416
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
417 418 419 420 421
  cJSON_Delete(pJson);
  return serialized;
}

// -------------------------------------
422
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
423 424
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
425

426
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
427 428 429 430
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
431

M
Minghao Li 已提交
432 433 434 435
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
436
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
437
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
438 439
    pReceiver->privateTerm = 0;

M
Minghao Li 已提交
440 441 442
  } else {
    sInfo("snapshotReceiverCreate cannot create receiver");
  }
443 444 445

  return pReceiver;
}
M
Minghao Li 已提交
446

447 448 449 450 451
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
452

453 454
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

M
Minghao Li 已提交
455
// begin receive snapshot msg (current term, seq begin)
456
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
M
Minghao Li 已提交
457
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
458
  pReceiver->privateTerm = privateTerm;
M
Minghao Li 已提交
459
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
460
  pReceiver->fromId = fromId;
M
Minghao Li 已提交
461 462 463 464 465 466 467 468

  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
}

// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
469
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
470
  if (!snapshotReceiverIsStart(pReceiver)) {
M
Minghao Li 已提交
471
    // start
472
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
473
    pReceiver->start = true;
M
Minghao Li 已提交
474

475
  } else {
M
Minghao Li 已提交
476
    // already start
477
    sInfo("snapshot recv, receiver already start");
M
Minghao Li 已提交
478 479 480 481 482 483 484 485

    // force close, abandon incomplete data
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;

    // start again
486
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
487
    pReceiver->start = true;
488
  }
M
Minghao Li 已提交
489

490 491 492 493 494
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStart %s", s);
    taosMemoryFree(s);
  }
495
}
M
Minghao Li 已提交
496

497
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
M
Minghao Li 已提交
498 499 500 501 502
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
503
  }
M
Minghao Li 已提交
504 505

  pReceiver->start = false;
506 507

  if (apply) {
508
    //    ++(pReceiver->privateTerm);
509
  }
M
Minghao Li 已提交
510

511 512 513 514 515
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStop %s", s);
    taosMemoryFree(s);
  }
516 517 518 519 520
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
521

522 523 524
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
525

526 527 528 529 530
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
531 532 533 534 535 536

    cJSON *pFromId = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
537
      cJSON   *pTmp = pFromId;
538 539 540 541 542 543 544 545 546
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

547 548
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
549 550 551

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
552 553 554 555 556 557 558 559 560
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
561
  char  *serialized = cJSON_Print(pJson);
562 563 564
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
565

566
// receiver do something
M
Minghao Li 已提交
567
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
568
  // get receiver
569 570
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
571
  int32_t                writeCode = 0;
M
Minghao Li 已提交
572

573 574
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
575 576 577
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        // begin
578
        snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId);
M
Minghao Li 已提交
579 580 581
        pReceiver->ack = pMsg->seq;
        needRsp = true;

M
Minghao Li 已提交
582 583 584
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
585 586 587

        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
588
          sDebug(
M
Minghao Li 已提交
589 590
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, "
              "lastTerm:%lu, "
M
Minghao Li 已提交
591
              "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
M
Minghao Li 已提交
592 593
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
              pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
594 595
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
596
          sDebug(
M
Minghao Li 已提交
597 598
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, "
              "lastTerm:%lu, "
M
Minghao Li 已提交
599
              "lastConfigIndex:%ld privateTerm:%lu",
M
Minghao Li 已提交
600 601
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
              pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
602
        }
M
Minghao Li 已提交
603

M
Minghao Li 已提交
604 605
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
606 607 608
        writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
        ASSERT(writeCode == 0);

M
Minghao Li 已提交
609
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
M
Minghao Li 已提交
610
        pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
611

612 613
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
614 615
          // int32_t  oldReplicaNum = pSyncNode->replicaNum;
          SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
616

617 618 619 620 621 622
          // update new config myIndex
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
          bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg);

#if 0
623 624 625 626 627 628 629 630 631 632 633
          // update new config myIndex
          bool     IamInNew = false;
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
            if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
                pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
              newSyncCfg.myIndex = i;
              IamInNew = true;
              break;
            }
          }
634
#endif
635

636
          bool isDrop;
637
          if (IamInNew) {
638
            sDebug(
M
Minghao Li 已提交
639
                "vgId:%d sync event %s currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, "
640
                "lastConfigIndex:%ld ",
M
Minghao Li 已提交
641 642
                pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
                pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
643 644
            syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
          } else {
M
Minghao Li 已提交
645
            sDebug(
M
Minghao Li 已提交
646
                "vgId:%d sync event %s currentTerm:%lu do not update config by snapshot, I am not in newCfg, "
647
                "lastIndex:%ld, lastTerm:%lu, "
648
                "lastConfigIndex:%ld ",
M
Minghao Li 已提交
649 650
                pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
                pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
651 652 653 654
          }

          // change isStandBy to normal
          if (!isDrop) {
M
Minghao Li 已提交
655 656 657 658 659 660 661 662 663 664
            char  tmpbuf[512];
            char *oldStr = syncCfg2Str(&oldSyncCfg);
            char *newStr = syncCfg2Str(&newSyncCfg);
            syncUtilJson2Line(oldStr);
            syncUtilJson2Line(newStr);
            snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s  -->  %s", oldSyncCfg.replicaNum,
                     newSyncCfg.replicaNum, oldStr, newStr);
            taosMemoryFree(oldStr);
            taosMemoryFree(newStr);

665
            if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
666
              syncNodeBecomeLeader(pSyncNode, tmpbuf);
667
            } else {
668
              syncNodeBecomeFollower(pSyncNode, tmpbuf);
669
            }
670
          }
671 672
        }

M
Minghao Li 已提交
673 674
        SSnapshot snapshot;
        pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
675

M
Minghao Li 已提交
676 677 678
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
679 680 681

        if (gRaftDetailLog) {
          char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
M
Minghao Li 已提交
682
          sDebug(
M
Minghao Li 已提交
683
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
684
              "snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
685
              "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
M
Minghao Li 已提交
686 687 688
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
              pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex,
              pReceiver->privateTerm, logSimpleStr);
689 690
          taosMemoryFree(logSimpleStr);
        } else {
M
Minghao Li 已提交
691
          sDebug(
M
Minghao Li 已提交
692
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
693
              "snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
694
              "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
M
Minghao Li 已提交
695 696 697
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
              pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex,
              pReceiver->privateTerm);
698
        }
M
Minghao Li 已提交
699

M
Minghao Li 已提交
700
        pReceiver->pWriter = NULL;
701
        snapshotReceiverStop(pReceiver, true);
M
Minghao Li 已提交
702
        pReceiver->ack = pMsg->seq;
M
Minghao Li 已提交
703
        needRsp = true;
M
Minghao Li 已提交
704

705 706
        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
707
          sDebug(
M
Minghao Li 已提交
708
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
M
Minghao Li 已提交
709
              "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
M
Minghao Li 已提交
710 711 712
              pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
              pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
              pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
713 714
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
715
          sDebug(
M
Minghao Li 已提交
716
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
M
Minghao Li 已提交
717
              "lastConfigIndex:%ld, privateTerm:%lu",
M
Minghao Li 已提交
718 719 720
              pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
              pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
              pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
721
        }
M
Minghao Li 已提交
722

M
Minghao Li 已提交
723 724
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
725
        snapshotReceiverStop(pReceiver, false);
M
Minghao Li 已提交
726 727
        needRsp = false;

M
Minghao Li 已提交
728 729 730 731
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

732 733
        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
734
          sDebug(
M
Minghao Li 已提交
735
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
736
              "lastTerm:%lu, "
M
Minghao Li 已提交
737
              "lastConfigIndex:%ld, privateTerm:%lu, recv "
738
              "msg:%s",
M
Minghao Li 已提交
739 740 741
              pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
              pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
              pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
742 743
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
744
          sDebug(
M
Minghao Li 已提交
745
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
746
              "lastTerm:%lu, "
M
Minghao Li 已提交
747
              "lastConfigIndex:%ld, privateTerm:%lu",
M
Minghao Li 已提交
748 749 750
              pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
              pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
              pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
751
        }
M
Minghao Li 已提交
752

M
Minghao Li 已提交
753 754 755
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
756 757 758
          writeCode =
              pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
          ASSERT(writeCode == 0);
M
Minghao Li 已提交
759 760 761 762
          pReceiver->ack = pMsg->seq;
        }
        needRsp = true;

M
Minghao Li 已提交
763 764 765
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
766 767 768

        if (gRaftDetailLog) {
          char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
769
          sDebug(
M
Minghao Li 已提交
770
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
771
              "lastTerm:%lu, "
M
Minghao Li 已提交
772
              "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
M
Minghao Li 已提交
773 774
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
              pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
775 776
          taosMemoryFree(msgStr);
        } else {
M
Minghao Li 已提交
777
          sDebug(
M
Minghao Li 已提交
778
              "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
779
              "lastTerm:%lu, "
M
Minghao Li 已提交
780
              "lastConfigIndex:%ld, privateTerm:%lu",
M
Minghao Li 已提交
781 782
              pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
              pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
783
        }
M
Minghao Li 已提交
784

M
Minghao Li 已提交
785 786
      } else {
        ASSERT(0);
787
      }
M
Minghao Li 已提交
788

M
Minghao Li 已提交
789 790 791 792 793 794 795 796
      if (needRsp) {
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
        pRspMsg->ack = pReceiver->ack;
797
        pRspMsg->code = writeCode;
M
Minghao Li 已提交
798
        pRspMsg->privateTerm = pReceiver->privateTerm;
M
Minghao Li 已提交
799 800 801 802

        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
M
Minghao Li 已提交
803

M
Minghao Li 已提交
804 805
        syncSnapshotRspDestroy(pRspMsg);
      }
M
Minghao Li 已提交
806
    }
M
Minghao Li 已提交
807 808
  } else {
    syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
M
Minghao Li 已提交
809
  }
M
Minghao Li 已提交
810

M
Minghao Li 已提交
811 812 813
  return 0;
}

M
Minghao Li 已提交
814 815
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
816
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
817 818 819 820 821 822
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    sInfo("recv SyncSnapshotRsp maybe replica already dropped");
    return 0;
  }

M
Minghao Li 已提交
823
  // get sender
824
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
825 826 827 828 829
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
830 831
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
832
        pSender->finish = true;
M
Minghao Li 已提交
833 834 835 836 837
        snapshotSenderStop(pSender);
        return 0;
      }

      // send next msg
838
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
839
        // update sender ack
840 841
        pSender->ack = pMsg->ack;
        (pSender->seq)++;
M
Minghao Li 已提交
842
        snapshotSend(pSender);
843

M
Minghao Li 已提交
844 845
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
846

M
Minghao Li 已提交
847 848
      } else {
        ASSERT(0);
849 850
      }
    }
M
Minghao Li 已提交
851 852
  } else {
    syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
853 854 855
  }

  return 0;
856
}