syncSnapshot.c 18.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncSnapshot.h"
M
Minghao Li 已提交
17 18
#include "syncRaftStore.h"
#include "syncUtil.h"
M
Minghao Li 已提交
19
#include "wal.h"
M
Minghao Li 已提交
20

M
Minghao Li 已提交
21
static void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
M
Minghao Li 已提交
22
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver);
M
Minghao Li 已提交
23

M
Minghao Li 已提交
24
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
25 26
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
M
Minghao Li 已提交
27

M
Minghao Li 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
  SSyncSnapshotSender *pSender = NULL;
  if (condition) {
    pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
    ASSERT(pSender != NULL);
    memset(pSender, 0, sizeof(*pSender));

    pSender->start = false;
    pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
    pSender->pReader = NULL;
    pSender->pCurrentBlock = NULL;
    pSender->blockLen = 0;
    pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
    pSender->pSyncNode = pSyncNode;
    pSender->replicaIndex = replicaIndex;
    pSender->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
44 45
    pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));

M
Minghao Li 已提交
46
    pSender->finish = false;
M
Minghao Li 已提交
47 48 49
  } else {
    sInfo("snapshotSenderCreate cannot create sender");
  }
M
Minghao Li 已提交
50

M
Minghao Li 已提交
51 52
  return pSender;
}
M
Minghao Li 已提交
53

M
Minghao Li 已提交
54 55 56 57 58
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
  if (pSender != NULL) {
    taosMemoryFree(pSender);
  }
}
M
Minghao Li 已提交
59

M
Minghao Li 已提交
60
// begin send snapshot (current term, seq begin)
M
Minghao Li 已提交
61 62 63 64
static void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
  pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
M
Minghao Li 已提交
65

M
Minghao Li 已提交
66 67
  // open snapshot reader
  ASSERT(pSender->pReader == NULL);
M
Minghao Li 已提交
68 69 70
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
  ASSERT(ret == 0);

M
Minghao Li 已提交
71
  // get current snapshot info
M
Minghao Li 已提交
72 73
  pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));

M
Minghao Li 已提交
74
  // build begin msg
M
Minghao Li 已提交
75 76 77 78 79 80
  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
M
Minghao Li 已提交
81
  pMsg->seq = pSender->seq;  // SYNC_SNAPSHOT_SEQ_BEGIN
M
Minghao Li 已提交
82

M
Minghao Li 已提交
83
  // send
M
Minghao Li 已提交
84 85 86
  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
87 88

  char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
89
  sTrace("snapshot send begin seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
90 91
  taosMemoryFree(msgStr);

M
Minghao Li 已提交
92 93 94
  syncSnapshotSendDestroy(pMsg);
}

M
Minghao Li 已提交
95
// when entry in snapshot, start sender
M
Minghao Li 已提交
96 97
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
  if (!(pSender->start)) {
M
Minghao Li 已提交
98
    // start
M
Minghao Li 已提交
99 100 101
    snapshotSenderDoStart(pSender);
    pSender->start = true;
  } else {
M
Minghao Li 已提交
102
    // already start
M
Minghao Li 已提交
103 104
    ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term);

M
Minghao Li 已提交
105
    // if current term is higher, need start again
M
Minghao Li 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118
    if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) {
      // force peer rollback
      SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId);
      pMsg->srcId = pSender->pSyncNode->myRaftId;
      pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
      pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
      pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
      pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
      pMsg->seq = SYNC_SNAPSHOT_SEQ_FORCE_CLOSE;

      SRpcMsg rpcMsg;
      syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
      syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
119 120

      char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
121
      sTrace("snapshot send force close seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
122 123
      taosMemoryFree(msgStr);

M
Minghao Li 已提交
124 125 126 127 128
      syncSnapshotSendDestroy(pMsg);

      // close reader
      int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
      ASSERT(ret == 0);
M
Minghao Li 已提交
129
      pSender->pReader = NULL;
M
Minghao Li 已提交
130 131 132

      // start again
      snapshotSenderDoStart(pSender);
M
Minghao Li 已提交
133
      pSender->start = true;
M
Minghao Li 已提交
134
    } else {
M
Minghao Li 已提交
135 136
      // current term, do nothing
      ASSERT(pSender->pSyncNode->pRaftStore->currentTerm == pSender->term);
M
Minghao Li 已提交
137 138
    }
  }
M
Minghao Li 已提交
139 140 141 142

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStart %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
143 144 145
}

void snapshotSenderStop(SSyncSnapshotSender *pSender) {
M
Minghao Li 已提交
146 147 148 149 150
  if (pSender->pReader != NULL) {
    int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
    ASSERT(ret == 0);
    pSender->pReader = NULL;
  }
M
Minghao Li 已提交
151 152 153

  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
154
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
155 156
    pSender->blockLen = 0;
  }
M
Minghao Li 已提交
157 158

  pSender->start = false;
M
Minghao Li 已提交
159 160 161 162

  char *s = snapshotSender2Str(pSender);
  sInfo("snapshotSenderStop %s", s);
  taosMemoryFree(s);
M
Minghao Li 已提交
163 164
}

M
Minghao Li 已提交
165 166
// when sender receiver ack, call this function to send msg from seq
// seq = ack + 1, already updated
M
Minghao Li 已提交
167 168 169 170
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
  // free memory last time (seq - 1)
  if (pSender->pCurrentBlock != NULL) {
    taosMemoryFree(pSender->pCurrentBlock);
M
Minghao Li 已提交
171
    pSender->pCurrentBlock = NULL;
M
Minghao Li 已提交
172 173 174 175 176 177 178
    pSender->blockLen = 0;
  }

  // read data
  int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
                                                           &(pSender->pCurrentBlock), &(pSender->blockLen));
  ASSERT(ret == 0);
M
Minghao Li 已提交
179 180 181 182 183 184
  if (pSender->blockLen > 0) {
    // has read data
  } else {
    // read finish
    pSender->seq = SYNC_SNAPSHOT_SEQ_END;
  }
M
Minghao Li 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197

  SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
  pMsg->srcId = pSender->pSyncNode->myRaftId;
  pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
  pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
  pMsg->seq = pSender->seq;
  memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

  SRpcMsg rpcMsg;
  syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
198 199 200

  char *msgStr = syncSnapshotSend2Str(pMsg);
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
M
Minghao Li 已提交
201
    sTrace("snapshot send finish seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
202
  } else {
M
Minghao Li 已提交
203
    sTrace("snapshot send sending seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
204 205 206
  }
  taosMemoryFree(msgStr);

M
Minghao Li 已提交
207 208 209 210 211
  syncSnapshotSendDestroy(pMsg);

  return 0;
}

M
Minghao Li 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
  if (pSender->pCurrentBlock != NULL) {
    SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId);
    pMsg->srcId = pSender->pSyncNode->myRaftId;
    pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
    pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
    pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
    pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
    pMsg->seq = pSender->seq;
    memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);

    SRpcMsg rpcMsg;
    syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
    syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
M
Minghao Li 已提交
227 228

    char *msgStr = syncSnapshotSend2Str(pMsg);
M
Minghao Li 已提交
229
    sTrace("snapshot send resend seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
M
Minghao Li 已提交
230 231
    taosMemoryFree(msgStr);

M
Minghao Li 已提交
232 233 234 235 236
    syncSnapshotSendDestroy(pMsg);
  }
  return 0;
}

M
Minghao Li 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  if (pSender != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pSender->start);
    cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
    cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
    cJSON_AddStringToObject(pRoot, "pReader", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
    cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
    cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);

    if (pSender->pCurrentBlock != NULL) {
      char *s;
      s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
      taosMemoryFree(s);
      s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
      cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
      taosMemoryFree(s);
    }

    cJSON *pSnapshot = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex);
M
Minghao Li 已提交
265
    cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
M
Minghao Li 已提交
266
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm);
M
Minghao Li 已提交
267
    cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
M
Minghao Li 已提交
268 269 270 271 272 273 274 275 276
    cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);

    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS);
    cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
M
Minghao Li 已提交
277
    cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
M
Minghao Li 已提交
278 279 280 281 282 283 284 285 286
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
  return pJson;
}

char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
  cJSON *pJson = snapshotSender2Json(pSender);
M
Minghao Li 已提交
287
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
288 289 290 291 292
  cJSON_Delete(pJson);
  return serialized;
}

// -------------------------------------
293
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
M
Minghao Li 已提交
294 295
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
296

M
Minghao Li 已提交
297 298 299 300 301
  SSyncSnapshotReceiver *pReceiver;
  if (condition) {
    pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
    ASSERT(pReceiver != NULL);
    memset(pReceiver, 0, sizeof(*pReceiver));
302

M
Minghao Li 已提交
303 304 305 306 307 308 309 310 311
    pReceiver->start = false;
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
    pReceiver->pWriter = NULL;
    pReceiver->pSyncNode = pSyncNode;
    pReceiver->replicaIndex = replicaIndex;
    pReceiver->term = pSyncNode->pRaftStore->currentTerm;
  } else {
    sInfo("snapshotReceiverCreate cannot create receiver");
  }
312 313 314

  return pReceiver;
}
M
Minghao Li 已提交
315

316 317 318 319 320
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
  if (pReceiver != NULL) {
    taosMemoryFree(pReceiver);
  }
}
M
Minghao Li 已提交
321

M
Minghao Li 已提交
322 323 324 325 326 327 328 329 330 331 332 333
// begin receive snapshot msg (current term, seq begin)
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver) {
  pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;

  ASSERT(pReceiver->pWriter == NULL);
  int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
  ASSERT(ret == 0);
}

// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
334 335
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) {
  if (!(pReceiver->start)) {
M
Minghao Li 已提交
336 337
    // start
    snapshotReceiverDoStart(pReceiver);
M
Minghao Li 已提交
338
    pReceiver->start = true;
M
Minghao Li 已提交
339

340
  } else {
M
Minghao Li 已提交
341 342 343 344 345 346 347 348 349 350
    // already start

    // force close, abandon incomplete data
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;

    // start again
    snapshotReceiverDoStart(pReceiver);
M
Minghao Li 已提交
351
    pReceiver->start = true;
M
Minghao Li 已提交
352

353 354
    ASSERT(0);
  }
M
Minghao Li 已提交
355 356 357 358

  char *s = snapshotReceiver2Str(pReceiver);
  sInfo("snapshotReceiverStart %s", s);
  taosMemoryFree(s);
359
}
M
Minghao Li 已提交
360

361
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
M
Minghao Li 已提交
362 363 364 365 366
  if (pReceiver->pWriter != NULL) {
    int32_t ret =
        pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
    ASSERT(ret == 0);
    pReceiver->pWriter = NULL;
367
  }
M
Minghao Li 已提交
368 369

  pReceiver->start = false;
M
Minghao Li 已提交
370 371 372 373

  char *s = snapshotReceiver2Str(pReceiver);
  sInfo("snapshotReceiverStop %s", s);
  taosMemoryFree(s);
374 375 376 377 378
}

cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
379

380 381 382
  if (pReceiver != NULL) {
    cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
    cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
M
Minghao Li 已提交
383

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
    cJSON_AddStringToObject(pRoot, "pWriter", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex);
    snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
  return pJson;
}

char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
  cJSON *pJson = snapshotReceiver2Json(pReceiver);
M
Minghao Li 已提交
401
  char * serialized = cJSON_Print(pJson);
402 403 404
  cJSON_Delete(pJson);
  return serialized;
}
M
Minghao Li 已提交
405

406
// receiver do something
M
Minghao Li 已提交
407
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
M
Minghao Li 已提交
408
  // get receiver
M
Minghao Li 已提交
409 410 411 412 413 414
  SSyncSnapshotReceiver *pReceiver = NULL;
  for (int i = 0; i < pSyncNode->replicaNum; ++i) {
    if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) {
      pReceiver = (pSyncNode->receivers)[i];
    }
  }
M
Minghao Li 已提交
415 416 417 418 419

  // add new replica
  if (pReceiver == NULL) {
    pReceiver = pSyncNode->pNewNodeReceiver;
  }
M
Minghao Li 已提交
420

M
Minghao Li 已提交
421 422
  bool needRsp = false;

423 424
  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
M
Minghao Li 已提交
425 426 427 428 429 430 431
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
      if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
        // begin
        snapshotReceiverStart(pReceiver);
        pReceiver->ack = pMsg->seq;
        needRsp = true;

M
Minghao Li 已提交
432 433 434 435
        char *msgStr = syncSnapshotSend2Str(pMsg);
        sTrace("snapshot recv begin ack:%d recv msg:%s", pReceiver->ack, msgStr);
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
436 437
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
        // end, finish FSM
438
        pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
M
Minghao Li 已提交
439
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
M
Minghao Li 已提交
440 441 442 443

        walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex);
        sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex);

M
Minghao Li 已提交
444
        pReceiver->pWriter = NULL;
M
Minghao Li 已提交
445
        snapshotReceiverStop(pReceiver);
M
Minghao Li 已提交
446
        pReceiver->ack = pMsg->seq;
M
Minghao Li 已提交
447
        needRsp = true;
M
Minghao Li 已提交
448

M
Minghao Li 已提交
449 450 451 452
        char *msgStr = syncSnapshotSend2Str(pMsg);
        sTrace("snapshot recv end ack:%d recv msg:%s", pReceiver->ack, msgStr);
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
453 454 455 456 457
      } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
        pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
        snapshotReceiverStop(pReceiver);
        needRsp = false;

M
Minghao Li 已提交
458 459 460 461
        char *msgStr = syncSnapshotSend2Str(pMsg);
        sTrace("snapshot recv force close ack:%d recv msg:%s", pReceiver->ack, msgStr);
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
462 463 464 465 466 467 468 469
      } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
        // transfering
        if (pMsg->seq == pReceiver->ack + 1) {
          pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
          pReceiver->ack = pMsg->seq;
        }
        needRsp = true;

M
Minghao Li 已提交
470 471 472 473
        char *msgStr = syncSnapshotSend2Str(pMsg);
        sTrace("snapshot recv receiving ack:%d recv msg:%s", pReceiver->ack, msgStr);
        taosMemoryFree(msgStr);

M
Minghao Li 已提交
474 475
      } else {
        ASSERT(0);
476
      }
M
Minghao Li 已提交
477

M
Minghao Li 已提交
478 479 480 481 482 483 484 485 486 487 488 489
      if (needRsp) {
        SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
        pRspMsg->srcId = pSyncNode->myRaftId;
        pRspMsg->destId = pMsg->srcId;
        pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
        pRspMsg->lastIndex = pMsg->lastIndex;
        pRspMsg->lastTerm = pMsg->lastTerm;
        pRspMsg->ack = pReceiver->ack;

        SRpcMsg rpcMsg;
        syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
        syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
M
Minghao Li 已提交
490

M
Minghao Li 已提交
491 492
        syncSnapshotRspDestroy(pRspMsg);
      }
M
Minghao Li 已提交
493
    }
M
Minghao Li 已提交
494 495
  } else {
    syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
M
Minghao Li 已提交
496
  }
M
Minghao Li 已提交
497

M
Minghao Li 已提交
498 499 500
  return 0;
}

M
Minghao Li 已提交
501 502
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
503
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
M
Minghao Li 已提交
504
  // get sender
505 506 507 508 509 510 511 512 513 514 515
  SSyncSnapshotSender *pSender = NULL;
  for (int i = 0; i < pSyncNode->replicaNum; ++i) {
    if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) {
      pSender = (pSyncNode->senders)[i];
    }
  }
  ASSERT(pSender != NULL);

  // state, term, seq/ack
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
516 517
      // receiver ack is finish, close sender
      if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
M
Minghao Li 已提交
518
        pSender->finish = true;
M
Minghao Li 已提交
519 520 521 522 523
        snapshotSenderStop(pSender);
        return 0;
      }

      // send next msg
524
      if (pMsg->ack == pSender->seq) {
M
Minghao Li 已提交
525
        // update sender ack
526 527
        pSender->ack = pMsg->ack;
        (pSender->seq)++;
M
Minghao Li 已提交
528
        snapshotSend(pSender);
M
Minghao Li 已提交
529 530 531 532
      } else if (pMsg->ack == pSender->seq - 1) {
        snapshotReSend(pSender);
      } else {
        ASSERT(0);
533 534
      }
    }
M
Minghao Li 已提交
535 536
  } else {
    syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
537 538 539 540
  }

  return 0;
}