syncSnapshot.c 23.6 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19
#include "syncRaftLog.h"
M
Minghao Li 已提交
20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
22
#include "wal.h"
M
Minghao Li 已提交
23

24
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
M
Minghao Li 已提交
25

26
//----------------------------------
M
Minghao Li 已提交
27
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
28 29
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
30

M
Minghao Li 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
47
    pSender->privateTerm = taosGetTimestampMs() + 100;
M
Minghao Li 已提交
48
    pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
49
    pSender->finish = false;
M
Minghao Li 已提交
50
  } else {
M
Minghao Li 已提交
51
    sError("snapshotSenderCreate cannot create sender");
M
Minghao Li 已提交
52
  }
53

M
Minghao Li 已提交
54 55
  return pSender;
}
M
Minghao Li 已提交
56

M
Minghao Li 已提交
57 58
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
M
Minghao Li 已提交
59 60 61
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
    }
M
Minghao Li 已提交
62 63 64
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
65

M
Minghao Li 已提交
66 67
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
68
// begin send snapshot (current term, seq begin)
M
Minghao Li 已提交
69 70
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
71

M
Minghao Li 已提交
72 73
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
74

M
Minghao Li 已提交
75 76
  // open snapshot reader
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
77 78 79
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
  ASSERT(ret == 0);

M
Minghao Li 已提交
80 81 82 83 84 85
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }

  pSender->blockLen = 0;

M
Minghao Li 已提交
86
  // get current snapshot info
M
Minghao Li 已提交
87
  pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
88 89 90 91

  sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld",
         pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);

92
  if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
93
    /*
94
    SSyncRaftEntry *pEntry = NULL;
95 96
    int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
                                                                  pSender->snapshot.lastConfigIndex, &pEntry);
97
    ASSERT(code == 0);
98 99 100 101 102 103
    ASSERT(pEntry != NULL);
    */

    SSyncRaftEntry *pEntry =
        pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex);
    ASSERT(pEntry != NULL);
104 105 106 107 108 109 110 111 112 113 114 115 116 117

    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);
    SSyncCfg lastConfig;
    int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
    ASSERT(ret == 0);
    pSender->lastConfig = lastConfig;

    rpcFreeCont(rpcMsg.pCont);
    syncEntryDestory(pEntry);

  } else {
    memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
  }
M
Minghao Li 已提交
118

M
Minghao Li 已提交
119 120 121
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  ++(pSender->privateTerm);
122
  pSender->finish = false;
M
Minghao Li 已提交
123 124
  pSender->start = true;

M
Minghao Li 已提交
125
  // build begin msg
M
Minghao Li 已提交
126 127 128 129 130 131
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
132 133
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
134
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
135
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
136

M
Minghao Li 已提交
137
  // send msg
M
Minghao Li 已提交
138 139 140
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
141

M
Minghao Li 已提交
142 143 144
  char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send");
  syncNodeEventLog(pSender->pSyncNode, eventLog);
  taosMemoryFree(eventLog);
M
Minghao Li 已提交
145

M
Minghao Li 已提交
146 147 148 149
  syncSnapshotSendDestroy(pMsg);
}

void snapshotSenderStop(SSyncSnapshotSender *pSender) {
M
Minghao Li 已提交
150 151 152 153 154
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
155 156 157

  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
158
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
159 160
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
161 162

  pSender->start = false;
M
Minghao Li 已提交
163

164 165 166 167 168
  if (gRaftDetailLog) {
    char *s = snapshotSender2Str(pSender);
    sInfo("snapshotSenderStop %s", s);
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
169 170
}

M
Minghao Li 已提交
171 172
// when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
M
Minghao Li 已提交
173 174 175 176
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
  // free memory last time (seq - 1)
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
177
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
178 179 180 181 182 183 184
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
185 186 187 188 189 190
  if (pSender->blockLen > 0) {
    // has read data
  } else {
    // read finish
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
191

M
Minghao Li 已提交
192
  // build msg
M
Minghao Li 已提交
193 194 195 196 197 198
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
199 200
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
  pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
201
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
202
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
203 204
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
205
  // send msg
M
Minghao Li 已提交
206 207 208
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
209 210

  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
M
Minghao Li 已提交
211 212 213 214
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender finish");
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);

M
Minghao Li 已提交
215
  } else {
M
Minghao Li 已提交
216 217 218
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender sending");
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
M
Minghao Li 已提交
219 220
  }

M
Minghao Li 已提交
221 222 223 224
  syncSnapshotSendDestroy(pMsg);
  return 0;
}

M
Minghao Li 已提交
225 226 227 228 229 230 231 232 233
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
  if (pSender->pCurrentBlock != NULL) {
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
234 235
    pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
    pMsg->lastConfig = pSender->lastConfig;
M
Minghao Li 已提交
236 237 238 239 240 241
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
242

M
Minghao Li 已提交
243 244 245
    char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender resend");
    syncNodeEventLog(pSender->pSyncNode, eventLog);
    taosMemoryFree(eventLog);
M
Minghao Li 已提交
246

M
Minghao Li 已提交
247 248 249 250 251
    syncSnapshotSendDestroy(pMsg);
  }
  return 0;
}

M
Minghao Li 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
280
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
281
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
282
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
283 284 285 286 287 288 289 290 291
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);

    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
292 293
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
294
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
295 296 297 298 299 300 301 302 303
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
304
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
305 306 307 308
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
  int32_t len = 256;
  char   *s = taosMemoryMalloc(len);

  SRaftId  destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);

  snprintf(s, len, "%s %p laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu replica-index:%d %s:%d",
           event, pSender, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
           pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->privateTerm,
           pSender->replicaIndex, host, port);

  return s;
}

M
Minghao Li 已提交
326
// -------------------------------------
327
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
M
Minghao Li 已提交
328 329
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
330

331
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
332 333 334 335
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
336

M
Minghao Li 已提交
337 338 339 340
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
341
    pReceiver->fromId = fromId;
M
Minghao Li 已提交
342
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
343 344
    pReceiver->privateTerm = 0;

M
Minghao Li 已提交
345 346 347
  } else {
    sInfo("snapshotReceiverCreate cannot create receiver");
  }
348 349 350

  return pReceiver;
}
M
Minghao Li 已提交
351

352 353 354 355 356
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
357

358 359
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

M
Minghao Li 已提交
360
// begin receive snapshot msg (current term, seq begin)
361
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
M
Minghao Li 已提交
362
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
363
  pReceiver->privateTerm = privateTerm;
M
Minghao Li 已提交
364
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
365
  pReceiver->fromId = fromId;
M
Minghao Li 已提交
366 367 368 369 370 371 372 373

  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
}

// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
374
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
375
  if (!snapshotReceiverIsStart(pReceiver)) {
M
Minghao Li 已提交
376
    // start
377
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
378
    pReceiver->start = true;
M
Minghao Li 已提交
379

380
  } else {
M
Minghao Li 已提交
381
    // already start
382
    sInfo("snapshot recv, receiver already start");
M
Minghao Li 已提交
383 384 385 386 387 388 389 390

    // force close, abandon incomplete data
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;

    // start again
391
    snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
M
Minghao Li 已提交
392
    pReceiver->start = true;
393
  }
M
Minghao Li 已提交
394

395 396 397 398 399
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStart %s", s);
    taosMemoryFree(s);
  }
400
}
M
Minghao Li 已提交
401

402
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
M
Minghao Li 已提交
403 404 405 406 407
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
408
  }
M
Minghao Li 已提交
409 410

  pReceiver->start = false;
411 412

  if (apply) {
413
    //    ++(pReceiver->privateTerm);
414
  }
M
Minghao Li 已提交
415

416 417 418 419 420
  if (gRaftDetailLog) {
    char *s = snapshotReceiver2Str(pReceiver);
    sInfo("snapshotReceiverStop %s", s);
    taosMemoryFree(s);
  }
421 422 423 424 425
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
426

427 428 429
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
430

431 432 433 434 435
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
436 437 438 439 440 441

    cJSON *pFromId = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
    cJSON_AddStringToObject(pFromId, "addr", u64buf);
    {
      uint64_t u64 = pReceiver->fromId.addr;
442
      cJSON   *pTmp = pFromId;
443 444 445 446 447 448 449 450 451
      char     host[128] = {0};
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pTmp, "addr_host", host);
      cJSON_AddNumberToObject(pTmp, "addr_port", port);
    }
    cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
    cJSON_AddItemToObject(pRoot, "fromId", pFromId);

452 453
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
454 455 456

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
457 458 459 460 461 462 463 464 465
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
466
  char  *serialized = cJSON_Print(pJson);
467 468 469
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
470

M
Minghao Li 已提交
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
  int32_t len = 256;
  char   *s = taosMemoryMalloc(len);

  SRaftId  fromId = pReceiver->fromId;
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);

  snprintf(s, len, "%s %p start:%d ack:%d term:%lu pterm:%lu %s:%d ", event, pReceiver, pReceiver->start,
           pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port);

  return s;
}

486
// receiver do something
M
Minghao Li 已提交
487
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
488
  // get receiver
489 490
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
491
  int32_t                writeCode = 0;
M
Minghao Li 已提交
492

493 494
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
495 496 497
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        // begin
498
        snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId);
M
Minghao Li 已提交
499 500 501
        pReceiver->ack = pMsg->seq;
        needRsp = true;

M
Minghao Li 已提交
502 503 504
        char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver begin");
        syncNodeEventLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
M
Minghao Li 已提交
505

M
Minghao Li 已提交
506 507
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
508 509 510
        writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
        ASSERT(writeCode == 0);

M
Minghao Li 已提交
511
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
M
Minghao Li 已提交
512
        pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
513

514 515
        // maybe update lastconfig
        if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
516 517
          // int32_t  oldReplicaNum = pSyncNode->replicaNum;
          SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
518

519 520 521 522 523
          // update new config myIndex
          SSyncCfg newSyncCfg = pMsg->lastConfig;
          syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
          bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg);

M
Minghao Li 已提交
524
          bool isDrop = false;
525
          if (IamInNew) {
M
Minghao Li 已提交
526 527 528 529 530 531
            char eventLog[128];
            snprintf(eventLog, sizeof(eventLog),
                     "update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld", pMsg->lastIndex,
                     pMsg->lastTerm, pMsg->lastConfigIndex);
            syncNodeEventLog(pSyncNode, eventLog);

532
            syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
M
Minghao Li 已提交
533

534
          } else {
M
Minghao Li 已提交
535 536 537 538 539
            char eventLog[128];
            snprintf(eventLog, sizeof(eventLog),
                     "do not update config by snapshot, not in new, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld",
                     pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
            syncNodeEventLog(pSyncNode, eventLog);
540 541 542 543
          }

          // change isStandBy to normal
          if (!isDrop) {
M
Minghao Li 已提交
544
            char  tmpbuf[512];
M
Minghao Li 已提交
545 546
            char *oldStr = syncCfg2SimpleStr(&oldSyncCfg);
            char *newStr = syncCfg2SimpleStr(&newSyncCfg);
547 548
            snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s  -->  %s",
                     oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
M
Minghao Li 已提交
549 550 551
            taosMemoryFree(oldStr);
            taosMemoryFree(newStr);

552
            if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
553
              syncNodeBecomeLeader(pSyncNode, tmpbuf);
554
            } else {
555
              syncNodeBecomeFollower(pSyncNode, tmpbuf);
556
            }
557
          }
558 559
        }

M
Minghao Li 已提交
560 561
        SSnapshot snapshot;
        pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
562

M
Minghao Li 已提交
563 564 565 566 567
        do {
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish");
          syncNodeEventLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);
M
Minghao Li 已提交
568

M
Minghao Li 已提交
569
        pReceiver->pWriter = NULL;
570
        snapshotReceiverStop(pReceiver, true);
M
Minghao Li 已提交
571
        pReceiver->ack = pMsg->seq;
M
Minghao Li 已提交
572
        needRsp = true;
M
Minghao Li 已提交
573

M
Minghao Li 已提交
574 575 576 577 578
        do {
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver end");
          syncNodeEventLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);
M
Minghao Li 已提交
579

M
Minghao Li 已提交
580 581
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
582
        snapshotReceiverStop(pReceiver, false);
M
Minghao Li 已提交
583 584
        needRsp = false;

M
Minghao Li 已提交
585 586 587 588 589
        do {
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force close");
          syncNodeEventLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);
M
Minghao Li 已提交
590

M
Minghao Li 已提交
591 592 593
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
594 595 596
          writeCode =
              pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
          ASSERT(writeCode == 0);
M
Minghao Li 已提交
597 598 599 600
          pReceiver->ack = pMsg->seq;
        }
        needRsp = true;

M
Minghao Li 已提交
601 602 603 604 605
        do {
          char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
          syncNodeEventLog(pSyncNode, eventLog);
          taosMemoryFree(eventLog);
        } while (0);
M
Minghao Li 已提交
606

M
Minghao Li 已提交
607 608
      } else {
        ASSERT(0);
609
      }
M
Minghao Li 已提交
610

M
Minghao Li 已提交
611 612 613 614 615 616 617 618
      if (needRsp) {
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
        pRspMsg->ack = pReceiver->ack;
619
        pRspMsg->code = writeCode;
M
Minghao Li 已提交
620
        pRspMsg->privateTerm = pReceiver->privateTerm;
M
Minghao Li 已提交
621 622 623 624

        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
M
Minghao Li 已提交
625

M
Minghao Li 已提交
626 627
        syncSnapshotRspDestroy(pRspMsg);
      }
M
Minghao Li 已提交
628
    }
M
Minghao Li 已提交
629 630
  } else {
    syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
M
Minghao Li 已提交
631
  }
M
Minghao Li 已提交
632

M
Minghao Li 已提交
633 634 635
  return 0;
}

M
Minghao Li 已提交
636 637
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
638
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
639 640 641 642 643 644
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    sInfo("recv SyncSnapshotRsp maybe replica already dropped");
    return 0;
  }

M
Minghao Li 已提交
645
  // get sender
646
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
647 648 649 650 651
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
652 653
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
654
        pSender->finish = true;
M
Minghao Li 已提交
655 656 657 658 659
        snapshotSenderStop(pSender);
        return 0;
      }

      // send next msg
660
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
661
        // update sender ack
662 663
        pSender->ack = pMsg->ack;
        (pSender->seq)++;
M
Minghao Li 已提交
664
        snapshotSend(pSender);
665

M
Minghao Li 已提交
666 667
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
668

M
Minghao Li 已提交
669 670
      } else {
        ASSERT(0);
671 672
      }
    }
M
Minghao Li 已提交
673 674
  } else {
    syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
675 676 677
  }

  return 0;
678
}