raft_replication.c 3.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "raft.h"
#include "raft_log.h"
18
#include "sync_raft_progress.h"
19
#include "syncInt.h"
20 21
#include "raft_replication.h"

22 23 24
static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress);
static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
                              SyncIndex prevIndex, SyncTerm prevTerm,
L
lichuang 已提交
25
                              SSyncRaftEntry *entries, int nEntry);
26

27
// maybeSendAppend sends an append RPC with new entries to the given peer,
28 29 30 31
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
32
bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) {
33
  assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
34
  SyncNodeId nodeId = progress->id;
35 36

  if (syncRaftProgressIsPaused(progress)) {
37 38
    syncInfo("node [%d:%d] paused", pRaft->selfGroupId, nodeId);
    return false;
39 40 41
  }

  SyncIndex nextIndex = syncRaftProgressNextIndex(progress);
42 43
  SSyncRaftEntry *entries;
  int nEntry;   
44 45 46
  SyncIndex prevIndex;
  SyncTerm prevTerm;

47 48 49
  prevIndex = nextIndex - 1;
  prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);
  int ret = syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry);
50

51 52
  if (nEntry == 0 && !sendIfEmpty) {
    return false;
53
  }
54

55 56
  if (ret != 0 || prevTerm == SYNC_NON_TERM) {
    return sendSnapshot(pRaft, progress);
57 58
  }

59
  return sendAppendEntries(pRaft, progress, prevIndex, prevTerm, entries, nEntry);
60 61
}

62 63 64 65 66
static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
  if (!syncRaftProgressRecentActive(progress)) {
    return false;
  }
  return true;
67 68
}

69 70
static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
                              SyncIndex prevIndex, SyncTerm prevTerm,
L
lichuang 已提交
71
                              SSyncRaftEntry *entries, int nEntry) {
L
lichuang 已提交
72 73 74 75
  SNodeInfo* pNode = syncRaftGetNodeById(pRaft, progress->id);
  if (pNode == NULL) {
    return false;
  }
76
  SyncIndex lastIndex;
L
lichuang 已提交
77
  SyncTerm logTerm = prevTerm;  
78 79 80 81 82 83

  SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term,
                                      prevIndex, prevTerm, pRaft->log->commitIndex,
                                      nEntry, entries);

  if (msg == NULL) {
84
    goto err_release_log;
85 86
  }

87 88 89 90 91 92
  if (nEntry != 0) {
    switch (progress->state) {
    // optimistically increase the next when in StateReplicate
    case PROGRESS_STATE_REPLICATE:
      lastIndex = entries[nEntry - 1].index;
      syncRaftProgressOptimisticNextIndex(progress, lastIndex);
L
lichuang 已提交
93
      syncRaftInflightAdd(progress->inflights, lastIndex);
94 95 96 97 98 99 100 101 102
      break;
    case PROGRESS_STATE_PROBE:
      progress->probeSent = true;
      break;
    default:
      syncFatal("[%d:%d] is sending append in unhandled state %s", 
                pRaft->selfGroupId, pRaft->selfId, syncRaftProgressStateString(progress));
      break;
    }
103
  }
104 105
  pRaft->io.send(msg, pNode);
  return true;
106 107

err_release_log:
108 109 110
  syncRaftLogRelease(pRaft->log, prevIndex + 1, entries, nEntry);
  return false;
}