raft_replication.c 4.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/*
 * Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "raft.h"
#include "raft_log.h"
#include "raft_progress.h"
#include "raft_replication.h"

static int sendSnapshot(SSyncRaft* pRaft, int i);
static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term);

int syncRaftReplicate(SSyncRaft* pRaft, int i) {
25
#if 0
26 27 28 29 30 31 32 33 34 35 36
  assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
  assert(i >= 0 && i < pRaft->leaderState.nProgress);

  SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
  SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
  if (syncRaftProgressIsPaused(progress)) {
    syncInfo("node %d paused", nodeId);
    return 0;
  }

  SyncIndex nextIndex = syncRaftProgressNextIndex(progress);
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
  SyncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log);
  bool inSnapshot = syncRaftProgressInSnapshot(progress);
  SyncIndex prevIndex;
  SyncTerm prevTerm;

  /**
   * From Section 3.5:
   *
   *   When sending an AppendEntries RPC, the leader includes the index and
   *   term of the entry in its log that immediately precedes the new
   *   entries. If the follower does not find an entry in its log with the
   *   same index and term, then it refuses the new entries. The consistency
   *   check acts as an induction step: the initial empty state of the logs
   *   satisfies the Log Matching Property, and the consistency check
   *   preserves the Log Matching Property whenever logs are extended. As a
   *   result, whenever AppendEntries returns successfully, the leader knows
   *   that the follower's log is identical to its own log up through the new
   *   entries (Log Matching Property in Figure 3.2).
   **/
  if (nextIndex == 1) {
    /**
     * We're including the very first entry, so prevIndex and prevTerm are
     * null. If the first entry is not available anymore, send the last
     * snapshot if we're not already sending one. 
     **/
    if (snapshotIndex > 0 && !inSnapshot) {
      goto send_snapshot;
    }

    // otherwise send append entries from start
    prevIndex = 0;
    prevTerm = 0;    
  } else {
    /**
     * Set prevIndex and prevTerm to the index and term of the entry at
     * nextIndex - 1. 
     **/ 
    prevIndex = nextIndex - 1;
    prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);
    /**
     * If the entry is not anymore in our log, send the last snapshot if we're
     * not doing so already.
     **/
    if (prevTerm == SYNC_NON_TERM && !inSnapshot) {
      goto send_snapshot;
    }
  }
84

85 86 87 88
  /* Send empty AppendEntries RPC when installing a snaphot */
  if (inSnapshot) {
    prevIndex = syncRaftLogLastIndex(pRaft->log);
    prevTerm = syncRaftLogLastTerm(pRaft->log);
89 90
  }

91 92
  return sendAppendEntries(pRaft, i, prevIndex, prevTerm);

93 94 95 96 97 98 99 100 101 102
send_snapshot:
  if (syncRaftProgressRecentActive(progress)) {
    /* Only send a snapshot when we have heard from the server */
    return sendSnapshot(pRaft, i);
  } else {
    /* Send empty AppendEntries RPC when we haven't heard from the server */
    prevIndex = syncRaftLogLastIndex(pRaft->log);
    prevTerm  = syncRaftLogLastTerm(pRaft->log);
    return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
  }
103 104
#endif
  return 0;
105 106 107 108 109 110 111
}

static int sendSnapshot(SSyncRaft* pRaft, int i) {
  return 0;
}

static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) {
112
#if 0
113 114 115 116 117 118 119 120 121 122 123 124
  SyncIndex nextIndex = prevIndex + 1;
  SSyncRaftEntry *entries;
  int nEntry;
  SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]);
  SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
  syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry);

  SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term,
                                      prevIndex, prevTerm, pRaft->log->commitIndex,
                                      nEntry, entries);

  if (msg == NULL) {
125
    goto err_release_log;
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
  }

  pRaft->io.send(msg, pNode);

  if (syncRaftProgressInReplicate(progress)) {
    SyncIndex lastIndex = nextIndex + nEntry;
    syncRaftProgressOptimisticNextIndex(progress, lastIndex);
    syncRaftInflightAdd(&progress->inflights, lastIndex);
  } else if (syncRaftProgressInProbe(progress)) {
    syncRaftProgressPause(progress);
  } else {

  }

  syncRaftProgressUpdateSendTick(progress, pRaft->currentTick);

  return 0;
143 144 145

err_release_log:
  syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry);
146
#endif
147
  return 0;
148
}