raft_replication.c 3.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
/*
 * Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "raft.h"
#include "raft_log.h"
#include "raft_progress.h"
#include "raft_replication.h"

static int sendSnapshot(SSyncRaft* pRaft, int i);
static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term);

int syncRaftReplicate(SSyncRaft* pRaft, int i) {
  assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
  assert(i >= 0 && i < pRaft->leaderState.nProgress);

  SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
  SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
  if (syncRaftProgressIsPaused(progress)) {
    syncInfo("node %d paused", nodeId);
    return 0;
  }

  SyncIndex nextIndex = syncRaftProgressNextIndex(progress);
  SyncIndex prevIndex = nextIndex - 1;
  SyncTerm prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);

  if (prevTerm == SYNC_NON_TERM && !syncRaftProgressInSnapshot(progress)) {
    goto send_snapshot;
  }

send_snapshot:
  if (syncRaftProgressRecentActive(progress)) {
    /* Only send a snapshot when we have heard from the server */
    return sendSnapshot(pRaft, i);
  } else {
    /* Send empty AppendEntries RPC when we haven't heard from the server */
    prevIndex = syncRaftLogLastIndex(pRaft->log);
    prevTerm  = syncRaftLogLastTerm(pRaft->log);
    return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
  }
}

static int sendSnapshot(SSyncRaft* pRaft, int i) {
  return 0;
}

static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) {
  SyncIndex nextIndex = prevIndex + 1;
  SSyncRaftEntry *entries;
  int nEntry;
  SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]);
  SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
  syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry);

  SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term,
                                      prevIndex, prevTerm, pRaft->log->commitIndex,
                                      nEntry, entries);

  if (msg == NULL) {
    return 0;
  }

  pRaft->io.send(msg, pNode);

  if (syncRaftProgressInReplicate(progress)) {
    SyncIndex lastIndex = nextIndex + nEntry;
    syncRaftProgressOptimisticNextIndex(progress, lastIndex);
    syncRaftInflightAdd(&progress->inflights, lastIndex);
  } else if (syncRaftProgressInProbe(progress)) {
    syncRaftProgressPause(progress);
  } else {

  }

  syncRaftProgressUpdateSendTick(progress, pRaft->currentTick);

  return 0;
}