syncSnapshot.c 21.6 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
17
#include "syncIndexMgr.h"
M
Minghao Li 已提交
18
#include "syncRaftLog.h"
M
Minghao Li 已提交
19 20
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
21
#include "wal.h"
M
Minghao Li 已提交
22

23
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm);
M
Minghao Li 已提交
24

25
//----------------------------------
M
Minghao Li 已提交
26
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
27 28
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
29

M
Minghao Li 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
46
    pSender->privateTerm = taosGetTimestampMs() + 100;
M
Minghao Li 已提交
47
    pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
48
    pSender->finish = false;
M
Minghao Li 已提交
49
  } else {
M
Minghao Li 已提交
50
    sError("snapshotSenderCreate cannot create sender");
M
Minghao Li 已提交
51
  }
M
Minghao Li 已提交
52 53
  return pSender;
}
M
Minghao Li 已提交
54

M
Minghao Li 已提交
55 56
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
M
Minghao Li 已提交
57 58 59
    if (pSender->pCurrentBlock != NULL) {
      taosMemoryFree(pSender->pCurrentBlock);
    }
M
Minghao Li 已提交
60 61 62
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
63

M
Minghao Li 已提交
64 65
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }

M
Minghao Li 已提交
66
// begin send snapshot (current term, seq begin)
M
Minghao Li 已提交
67 68
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  ASSERT(!snapshotSenderIsStart(pSender));
M
Minghao Li 已提交
69

M
Minghao Li 已提交
70 71
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
72

M
Minghao Li 已提交
73 74
  // open snapshot reader
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
75 76 77
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
  ASSERT(ret == 0);

M
Minghao Li 已提交
78 79 80 81 82 83
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
  }

  pSender->blockLen = 0;

M
Minghao Li 已提交
84
  // get current snapshot info
M
Minghao Li 已提交
85 86
  pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));

M
Minghao Li 已提交
87 88 89
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  ++(pSender->privateTerm);
90
  pSender->finish = false;
M
Minghao Li 已提交
91 92
  pSender->start = true;

M
Minghao Li 已提交
93
  // build begin msg
M
Minghao Li 已提交
94 95 96 97 98 99
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
M
Minghao Li 已提交
100
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
101
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
102

M
Minghao Li 已提交
103
  // send msg
M
Minghao Li 已提交
104 105 106
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
107

M
Minghao Li 已提交
108 109 110 111 112 113
  char    *msgStr = syncSnapshotSend2Str(pMsg);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
  sTrace("sync event snapshot send to %s:%d begin seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack,
         msgStr);
M
Minghao Li 已提交
114 115
  taosMemoryFree(msgStr);

M
Minghao Li 已提交
116 117 118
  syncSnapshotSendDestroy(pMsg);
}

119
#if 0
M
Minghao Li 已提交
120
// when entry in snapshot, start sender
M
Minghao Li 已提交
121 122
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  if (!(pSender->start)) {
M
Minghao Li 已提交
123
    // start
M
Minghao Li 已提交
124 125 126
    snapshotSenderDoStart(pSender);
    pSender->start = true;
  } else {
M
Minghao Li 已提交
127
    // already start
M
Minghao Li 已提交
128 129
    ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term);

M
Minghao Li 已提交
130
    // if current term is higher, need start again
M
Minghao Li 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143
    if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) {
      // force peer rollback
      SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
      pMsg->srcId = pSender->pSyncNode->myRaftId;
      pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
      pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
      pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
      pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
      pMsg->seq = SYNC_SNAPSHOT_SEQ_FORCE_CLOSE;

      SRpcMsg rpcMsg;
      syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
      syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
144 145

      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
146
      sTrace("snapshot send force close seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
147 148
      taosMemoryFree(msgStr);

M
Minghao Li 已提交
149 150 151 152 153
      syncSnapshotSendDestroy(pMsg);

      // close reader
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
      ASSERT(ret == 0);
M
Minghao Li 已提交
154
      pSender->pReader = NULL;
M
Minghao Li 已提交
155 156 157

      // start again
      snapshotSenderDoStart(pSender);
M
Minghao Li 已提交
158
      pSender->start = true;
M
Minghao Li 已提交
159
    } else {
M
Minghao Li 已提交
160 161
      // current term, do nothing
      ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term);
M
Minghao Li 已提交
162 163
    }
  }
M
Minghao Li 已提交
164 165 166 167

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStart %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
168
}
169
#endif
M
Minghao Li 已提交
170 171

void snapshotSenderStop(SSyncSnapshotSender *pSender) {
M
Minghao Li 已提交
172 173 174 175 176
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
177 178 179

  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
180
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
181 182
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
183 184

  pSender->start = false;
M
Minghao Li 已提交
185 186 187 188

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStop %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
189 190
}

M
Minghao Li 已提交
191 192
// when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
M
Minghao Li 已提交
193 194 195 196
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
  // free memory last time (seq - 1)
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
197
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
198 199 200 201 202 203 204
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
205 206 207 208 209 210
  if (pSender->blockLen > 0) {
    // has read data
  } else {
    // read finish
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
211

M
Minghao Li 已提交
212
  // build msg
M
Minghao Li 已提交
213 214 215 216 217 218 219
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->seq = pSender->seq;
M
Minghao Li 已提交
220
  pMsg->privateTerm = pSender->privateTerm;
M
Minghao Li 已提交
221 222
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

M
Minghao Li 已提交
223
  // send msg
M
Minghao Li 已提交
224 225 226
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
227

M
Minghao Li 已提交
228 229 230 231
  char    *msgStr = syncSnapshotSend2Str(pMsg);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
M
Minghao Li 已提交
232
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
M
Minghao Li 已提交
233 234
    sTrace("sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s",
           host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
M
Minghao Li 已提交
235
           msgStr);
M
Minghao Li 已提交
236
  } else {
M
Minghao Li 已提交
237 238
    sTrace("sync event snapshot send to %s:%d sending seq:%d ack:%d send msg:%s", host, port, pSender->seq,
           pSender->ack, msgStr);
M
Minghao Li 已提交
239 240 241
  }
  taosMemoryFree(msgStr);

M
Minghao Li 已提交
242 243 244 245
  syncSnapshotSendDestroy(pMsg);
  return 0;
}

M
Minghao Li 已提交
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
  if (pSender->pCurrentBlock != NULL) {
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
261

M
Minghao Li 已提交
262 263 264 265 266 267
    char    *msgStr = syncSnapshotSend2Str(pMsg);
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
    sTrace("sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack,
           msgStr);
M
Minghao Li 已提交
268 269
    taosMemoryFree(msgStr);

M
Minghao Li 已提交
270 271 272 273 274
    syncSnapshotSendDestroy(pMsg);
  }
  return 0;
}

M
Minghao Li 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
303
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
304
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
305
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
306 307 308 309 310 311 312 313 314
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);

    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
315 316
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
317
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
318 319 320 321 322 323 324 325 326
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
M
Minghao Li 已提交
327
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
328 329 330 331 332
  cJSON_Delete(pJson);
  return serialized;
}

// -------------------------------------
333
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
334 335
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
336

337
  SSyncSnapshotReceiver *pReceiver = NULL;
M
Minghao Li 已提交
338 339 340 341
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
342

M
Minghao Li 已提交
343 344 345 346 347 348
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
    pReceiver->replicaIndex = replicaIndex;
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
349 350
    pReceiver->privateTerm = 0;

M
Minghao Li 已提交
351 352 353
  } else {
    sInfo("snapshotReceiverCreate cannot create receiver");
  }
354 355 356

  return pReceiver;
}
M
Minghao Li 已提交
357

358 359 360 361 362
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
363

364 365
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }

M
Minghao Li 已提交
366
// begin receive snapshot msg (current term, seq begin)
367
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
M
Minghao Li 已提交
368
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
369
  pReceiver->privateTerm = privateTerm;
M
Minghao Li 已提交
370 371 372 373 374 375 376 377 378
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
}

// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
379
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
380
  if (!snapshotReceiverIsStart(pReceiver)) {
M
Minghao Li 已提交
381
    // start
382
    snapshotReceiverDoStart(pReceiver, privateTerm);
M
Minghao Li 已提交
383
    pReceiver->start = true;
M
Minghao Li 已提交
384

385
  } else {
M
Minghao Li 已提交
386 387 388 389 390 391 392 393 394
    // already start

    // force close, abandon incomplete data
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;

    // start again
395
    snapshotReceiverDoStart(pReceiver, privateTerm);
M
Minghao Li 已提交
396
    pReceiver->start = true;
M
Minghao Li 已提交
397

398 399
    ASSERT(0);
  }
M
Minghao Li 已提交
400 401 402 403

  char *s = snapshotReceiver2Str(pReceiver);
  sInfo("snapshotReceiverStart %s", s);
  taosMemoryFree(s);
404
}
M
Minghao Li 已提交
405

406
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
M
Minghao Li 已提交
407 408 409 410 411
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
412
  }
M
Minghao Li 已提交
413 414

  pReceiver->start = false;
415 416 417 418

  if (apply) {
    ++(pReceiver->privateTerm);
  }
M
Minghao Li 已提交
419 420 421 422

  char *s = snapshotReceiver2Str(pReceiver);
  sInfo("snapshotReceiverStop %s", s);
  taosMemoryFree(s);
423 424 425 426 427
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
428

429 430 431
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
432

433 434 435 436 437 438 439 440
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
441 442 443

    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
    cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
444 445 446 447 448 449 450 451 452
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
M
Minghao Li 已提交
453
  char  *serialized = cJSON_Print(pJson);
454 455 456
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
457

458
// receiver do something
M
Minghao Li 已提交
459
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
460
  // get receiver
461 462
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
  bool                   needRsp = false;
463
  int32_t                writeCode = 0;
M
Minghao Li 已提交
464

465 466
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
467 468 469
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        // begin
470
        snapshotReceiverStart(pReceiver, pMsg->privateTerm);
M
Minghao Li 已提交
471 472 473
        pReceiver->ack = pMsg->seq;
        needRsp = true;

M
Minghao Li 已提交
474 475 476 477 478 479
        char    *msgStr = syncSnapshotSend2Str(pMsg);
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
        sTrace("sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port,
               pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
M
Minghao Li 已提交
480 481
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
482 483
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
484 485 486
        writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
        ASSERT(writeCode == 0);

M
Minghao Li 已提交
487
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
M
Minghao Li 已提交
488

M
Minghao Li 已提交
489
        pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
M
Minghao Li 已提交
490
        char     *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
M
Minghao Li 已提交
491 492
        SSnapshot snapshot;
        pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
493 494 495
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
M
Minghao Li 已提交
496
        sInfo(
M
Minghao Li 已提交
497
            "sync event snapshot recv from %s:%d finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
M
Minghao Li 已提交
498
            "snapshot.lastApplyTerm:%lu, raft log:%s",
M
Minghao Li 已提交
499
            host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
M
Minghao Li 已提交
500 501
        taosMemoryFree(logSimpleStr);

M
Minghao Li 已提交
502
        pReceiver->pWriter = NULL;
503
        snapshotReceiverStop(pReceiver, true);
M
Minghao Li 已提交
504
        pReceiver->ack = pMsg->seq;
M
Minghao Li 已提交
505
        needRsp = true;
M
Minghao Li 已提交
506

M
Minghao Li 已提交
507
        char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
508 509
        sTrace("sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port,
               pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
M
Minghao Li 已提交
510 511
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
512 513
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
514
        snapshotReceiverStop(pReceiver, false);
M
Minghao Li 已提交
515 516
        needRsp = false;

M
Minghao Li 已提交
517 518 519 520
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

M
Minghao Li 已提交
521
        char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
522 523
        sTrace("sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host,
               port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
M
Minghao Li 已提交
524

M
Minghao Li 已提交
525 526
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
527 528 529
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
530 531 532
          writeCode =
              pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
          ASSERT(writeCode == 0);
M
Minghao Li 已提交
533 534 535 536
          pReceiver->ack = pMsg->seq;
        }
        needRsp = true;

M
Minghao Li 已提交
537 538 539 540 541 542
        char    *msgStr = syncSnapshotSend2Str(pMsg);
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
        sTrace("sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host,
               port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
M
Minghao Li 已提交
543 544
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
545 546
      } else {
        ASSERT(0);
547
      }
M
Minghao Li 已提交
548

M
Minghao Li 已提交
549 550 551 552 553 554 555 556
      if (needRsp) {
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
        pRspMsg->ack = pReceiver->ack;
557
        pRspMsg->code = writeCode;
M
Minghao Li 已提交
558
        pRspMsg->privateTerm = pReceiver->privateTerm;
M
Minghao Li 已提交
559 560 561 562

        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
M
Minghao Li 已提交
563

M
Minghao Li 已提交
564 565
        syncSnapshotRspDestroy(pRspMsg);
      }
M
Minghao Li 已提交
566
    }
M
Minghao Li 已提交
567 568
  } else {
    syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
M
Minghao Li 已提交
569
  }
M
Minghao Li 已提交
570

M
Minghao Li 已提交
571 572 573
  return 0;
}

M
Minghao Li 已提交
574 575
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
576
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
M
Minghao Li 已提交
577
  // get sender
578
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
579 580 581 582 583
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
584 585
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
586
        pSender->finish = true;
M
Minghao Li 已提交
587 588 589 590 591
        snapshotSenderStop(pSender);
        return 0;
      }

      // send next msg
592
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
593
        // update sender ack
594 595
        pSender->ack = pMsg->ack;
        (pSender->seq)++;
M
Minghao Li 已提交
596
        snapshotSend(pSender);
597

M
Minghao Li 已提交
598 599
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
600

M
Minghao Li 已提交
601 602
      } else {
        ASSERT(0);
603 604
      }
    }
M
Minghao Li 已提交
605 606
  } else {
    syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
607 608 609 610
  }

  return 0;
}