syncSnapshot.c 20.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
M
Minghao Li 已提交
18
#include "syncRaftLog.h"
M
Minghao Li 已提交
19 20
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
21
#include "wal.h"
M
Minghao Li 已提交
22

23
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm);
M
Minghao Li 已提交
24

25
//----------------------------------
M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
29

M
Minghao Li 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
46
    pSender->privateTerm = taosGetTimestampMs() + 100;
M
Minghao Li 已提交
47
    pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
48
    pSender->finish = false;
M
Minghao Li 已提交
49
  } else {
M
Minghao Li 已提交
50
    sError("snapshotSenderCreate cannot create sender");
M
Minghao Li 已提交
51
  }
M
Minghao Li 已提交
52 53
  return pSender;
}
M
Minghao Li 已提交
54

M
Minghao Li 已提交
55 56
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
M
Minghao Li 已提交
57 58 59
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
    }
M
Minghao Li 已提交
60 61 62
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
63

M
Minghao Li 已提交
64 65
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
66
// begin send snapshot (current term, seq begin)
M
Minghao Li 已提交
67 68
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
69

M
Minghao Li 已提交
70 71
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
72

M
Minghao Li 已提交
73 74
  // open snapshot reader
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
75 76 77
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
  ASSERT(ret == 0);

M
Minghao Li 已提交
78 79 80 81 82 83
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }

  pSender->blockLen = 0;

M
Minghao Li 已提交
84
  // get current snapshot info
M
Minghao Li 已提交
85 86
  pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));

M
Minghao Li 已提交
87 88 89
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  ++(pSender->privateTerm);
90
  pSender->finish = false;
M
Minghao Li 已提交
91 92
  pSender->start = true;

M
Minghao Li 已提交
93
  // build begin msg
M
Minghao Li 已提交
94 95 96 97 98 99
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
M
Minghao Li 已提交
100
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
101
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
102

M
Minghao Li 已提交
103
  // send msg
M
Minghao Li 已提交
104 105 106
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
107 108

  char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
109
  sTrace("snapshot send begin seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
110 111
  taosMemoryFree(msgStr);

M
Minghao Li 已提交
112 113 114
  syncSnapshotSendDestroy(pMsg);
}

115
#if 0
M
Minghao Li 已提交
116
// when entry in snapshot, start sender
M
Minghao Li 已提交
117 118
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  if (!(pSender->start)) {
M
Minghao Li 已提交
119
    // start
M
Minghao Li 已提交
120 121 122
    snapshotSenderDoStart(pSender);
    pSender->start = true;
  } else {
M
Minghao Li 已提交
123
    // already start
M
Minghao Li 已提交
124 125
    ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term);

M
Minghao Li 已提交
126
    // if current term is higher, need start again
M
Minghao Li 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139
    if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) {
      // force peer rollback
      SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
      pMsg->srcId = pSender->pSyncNode->myRaftId;
      pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
      pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
      pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
      pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
      pMsg->seq = SYNC_SNAPSHOT_SEQ_FORCE_CLOSE;

      SRpcMsg rpcMsg;
      syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
      syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
140 141

      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
142
      sTrace("snapshot send force close seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
143 144
      taosMemoryFree(msgStr);

M
Minghao Li 已提交
145 146 147 148 149
      syncSnapshotSendDestroy(pMsg);

      // close reader
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
      ASSERT(ret == 0);
M
Minghao Li 已提交
150
      pSender->pReader = NULL;
M
Minghao Li 已提交
151 152 153

      // start again
      snapshotSenderDoStart(pSender);
M
Minghao Li 已提交
154
      pSender->start = true;
M
Minghao Li 已提交
155
    } else {
M
Minghao Li 已提交
156 157
      // current term, do nothing
      ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term);
M
Minghao Li 已提交
158 159
    }
  }
M
Minghao Li 已提交
160 161 162 163

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStart %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
164
}
165
#endif
M
Minghao Li 已提交
166 167

void snapshotSenderStop(SSyncSnapshotSender *pSender) {
M
Minghao Li 已提交
168 169 170 171 172
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
173 174 175

  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
176
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
177 178
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
179 180

  pSender->start = false;
M
Minghao Li 已提交
181 182 183 184

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStop %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
185 186
}

M
Minghao Li 已提交
187 188
// when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
M
Minghao Li 已提交
189 190 191 192
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
  // free memory last time (seq - 1)
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
193
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
194 195 196 197 198 199 200
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
201 202 203 204 205 206
  if (pSender->blockLen > 0) {
    // has read data
  } else {
    // read finish
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
207

M
Minghao Li 已提交
208
  // build msg
M
Minghao Li 已提交
209 210 211 212 213 214 215
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
216
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
217 218
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
219
  // send msg
M
Minghao Li 已提交
220 221 222
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
223 224 225

  char *msgStr = syncSnapshotSend2Str(pMsg);
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
M
Minghao Li 已提交
226
    sTrace("snapshot send finish seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
227
  } else {
M
Minghao Li 已提交
228
    sTrace("snapshot send sending seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
229 230 231
  }
  taosMemoryFree(msgStr);

M
Minghao Li 已提交
232 233 234 235
  syncSnapshotSendDestroy(pMsg);
  return 0;
}

M
Minghao Li 已提交
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
  if (pSender->pCurrentBlock != NULL) {
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
251 252

    char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
253
    sTrace("snapshot send resend seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
254 255
    taosMemoryFree(msgStr);

M
Minghao Li 已提交
256 257 258 259 260
    syncSnapshotSendDestroy(pMsg);
  }
  return 0;
}

M
Minghao Li 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
289
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
290
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
291
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
292 293 294 295 296 297 298 299 300
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);

    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
301 302
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
303
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
304 305 306 307 308 309 310 311 312
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
M
Minghao Li 已提交
313
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
314 315 316 317 318
  cJSON_Delete(pJson);
  return serialized;
}

// -------------------------------------
319
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
320 321
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
322

323
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
324 325 326 327
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
328

M
Minghao Li 已提交
329 330 331 332 333 334
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
    pReceiver->replicaIndex = replicaIndex;
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
335 336
    pReceiver->privateTerm = 0;

M
Minghao Li 已提交
337 338 339
  } else {
    sInfo("snapshotReceiverCreate cannot create receiver");
  }
340 341 342

  return pReceiver;
}
M
Minghao Li 已提交
343

344 345 346 347 348
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
349

350 351
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

M
Minghao Li 已提交
352
// begin receive snapshot msg (current term, seq begin)
353
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
M
Minghao Li 已提交
354
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
355
  pReceiver->privateTerm = privateTerm;
M
Minghao Li 已提交
356 357 358 359 360 361 362 363 364
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
}

// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
365
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
366
  if (!snapshotReceiverIsStart(pReceiver)) {
M
Minghao Li 已提交
367
    // start
368
    snapshotReceiverDoStart(pReceiver, privateTerm);
M
Minghao Li 已提交
369
    pReceiver->start = true;
M
Minghao Li 已提交
370

371
  } else {
M
Minghao Li 已提交
372 373 374 375 376 377 378 379 380
    // already start

    // force close, abandon incomplete data
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;

    // start again
381
    snapshotReceiverDoStart(pReceiver, privateTerm);
M
Minghao Li 已提交
382
    pReceiver->start = true;
M
Minghao Li 已提交
383

384 385
    ASSERT(0);
  }
M
Minghao Li 已提交
386 387 388 389

  char *s = snapshotReceiver2Str(pReceiver);
  sInfo("snapshotReceiverStart %s", s);
  taosMemoryFree(s);
390
}
M
Minghao Li 已提交
391

392
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
M
Minghao Li 已提交
393 394 395 396 397
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
398
  }
M
Minghao Li 已提交
399 400

  pReceiver->start = false;
401 402 403 404

  if (apply) {
    ++(pReceiver->privateTerm);
  }
M
Minghao Li 已提交
405 406 407 408

  char *s = snapshotReceiver2Str(pReceiver);
  sInfo("snapshotReceiverStop %s", s);
  taosMemoryFree(s);
409 410 411 412 413
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
414

415 416 417
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
418

419 420 421 422 423 424 425 426
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
427 428 429

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
430 431 432 433 434 435 436 437 438
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
M
Minghao Li 已提交
439
  char  *serialized = cJSON_Print(pJson);
440 441 442
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
443

444
// receiver do something
M
Minghao Li 已提交
445
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
446
  // get receiver
447 448
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
449
  int32_t                writeCode = 0;
M
Minghao Li 已提交
450

451 452
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
453 454 455
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        // begin
456
        snapshotReceiverStart(pReceiver, pMsg->privateTerm);
M
Minghao Li 已提交
457 458 459
        pReceiver->ack = pMsg->seq;
        needRsp = true;

M
Minghao Li 已提交
460
        char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
461 462
        sTrace("snapshot recv begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex,
               pMsg->lastTerm, msgStr);
M
Minghao Li 已提交
463 464
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
465 466
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
467 468 469
        writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
        ASSERT(writeCode == 0);

M
Minghao Li 已提交
470
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
M
Minghao Li 已提交
471

M
Minghao Li 已提交
472
        pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
M
Minghao Li 已提交
473
        char     *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
M
Minghao Li 已提交
474 475
        SSnapshot snapshot;
        pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
476 477 478 479
        sInfo(
            "snapshot recv finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
            "snapshot.lastApplyTerm:%lu, raft log:%s",
            pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
M
Minghao Li 已提交
480 481
        taosMemoryFree(logSimpleStr);

M
Minghao Li 已提交
482
        pReceiver->pWriter = NULL;
483
        snapshotReceiverStop(pReceiver, true);
M
Minghao Li 已提交
484
        pReceiver->ack = pMsg->seq;
M
Minghao Li 已提交
485
        needRsp = true;
M
Minghao Li 已提交
486

M
Minghao Li 已提交
487
        char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
488 489
        sTrace("snapshot recv end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex,
               pMsg->lastTerm, msgStr);
M
Minghao Li 已提交
490 491
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
492 493
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
494
        snapshotReceiverStop(pReceiver, false);
M
Minghao Li 已提交
495 496
        needRsp = false;

M
Minghao Li 已提交
497
        char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
498 499
        sTrace("snapshot recv force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
               pMsg->lastIndex, pMsg->lastTerm, msgStr);
M
Minghao Li 已提交
500

M
Minghao Li 已提交
501 502
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
503 504 505
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
506 507 508
          writeCode =
              pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
          ASSERT(writeCode == 0);
M
Minghao Li 已提交
509 510 511 512
          pReceiver->ack = pMsg->seq;
        }
        needRsp = true;

M
Minghao Li 已提交
513
        char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
514 515
        sTrace("snapshot recv receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
               pMsg->lastIndex, pMsg->lastTerm, msgStr);
M
Minghao Li 已提交
516 517
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
518 519
      } else {
        ASSERT(0);
520
      }
M
Minghao Li 已提交
521

M
Minghao Li 已提交
522 523 524 525 526 527 528 529
      if (needRsp) {
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
        pRspMsg->ack = pReceiver->ack;
530
        pRspMsg->code = writeCode;
M
Minghao Li 已提交
531
        pRspMsg->privateTerm = pReceiver->privateTerm;
M
Minghao Li 已提交
532 533 534 535

        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
M
Minghao Li 已提交
536

M
Minghao Li 已提交
537 538
        syncSnapshotRspDestroy(pRspMsg);
      }
M
Minghao Li 已提交
539
    }
M
Minghao Li 已提交
540 541
  } else {
    syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
M
Minghao Li 已提交
542
  }
M
Minghao Li 已提交
543

M
Minghao Li 已提交
544 545 546
  return 0;
}

M
Minghao Li 已提交
547 548
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
549
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
M
Minghao Li 已提交
550
  // get sender
551
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
552 553 554 555 556
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
557 558
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
559
        pSender->finish = true;
M
Minghao Li 已提交
560 561 562 563 564
        snapshotSenderStop(pSender);
        return 0;
      }

      // send next msg
565
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
566
        // update sender ack
567 568
        pSender->ack = pMsg->ack;
        (pSender->seq)++;
M
Minghao Li 已提交
569
        snapshotSend(pSender);
570

M
Minghao Li 已提交
571 572
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
573

M
Minghao Li 已提交
574 575
      } else {
        ASSERT(0);
576 577
      }
    }
M
Minghao Li 已提交
578 579
  } else {
    syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
580 581 582 583
  }

  return 0;
}