raft.c 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "raft.h"
17
#include "raft_configuration.h"
18
#include "raft_log.h"
19
#include "raft_replication.h"
20
#include "sync_raft_progress_tracker.h"
21 22 23 24
#include "syncInt.h"

#define RAFT_READ_LOG_MAX_NUM 100

25 26 27 28
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);

29 30 31 32 33 34 35 36 37 38
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
  SSyncNode* pNode = pRaft->pNode;
  SSyncServerState serverState;
  SStateManager* stateManager;
  SSyncLogStore* logStore;
  SSyncFSM* fsm;
  SyncIndex initIndex = pInfo->snapshotIndex;
  SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM];
  int nBuf, limit, i;
  
39 40
  memset(pRaft, 0, sizeof(SSyncRaft));

41 42 43
  memcpy(&pRaft->fsm, &pInfo->fsm, sizeof(SSyncFSM));
  memcpy(&pRaft->logStore, &pInfo->logStore, sizeof(SSyncLogStore));
  memcpy(&pRaft->stateManager, &pInfo->stateManager, sizeof(SStateManager));
44

45 46 47 48
  stateManager = &(pRaft->stateManager);
  logStore = &(pRaft->logStore);
  fsm = &(pRaft->fsm);

49 50 51 52 53 54
  // init progress tracker
  pRaft->tracker = syncRaftOpenProgressTracker();
  if (pRaft->tracker == NULL) {
    return -1;
  }

55 56 57 58
  // open raft log
  if ((pRaft->log = syncRaftLogOpen()) == NULL) {
    return -1;
  }
59 60 61 62 63 64 65
  // read server state
  if (stateManager->readServerState(stateManager, &serverState) != 0) {
    syncError("readServerState for vgid %d fail", pInfo->vgId);
    return -1;
  }
  assert(initIndex <= serverState.commitIndex);

66
  // restore fsm state from snapshot index + 1 until commitIndex
67
  ++initIndex;
68 69
  while (initIndex <= serverState.commitIndex) {
    limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex + 1);
70 71 72 73 74 75 76 77 78 79 80 81 82 83

    if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) {
      return -1;
    }
    assert(limit == nBuf);
  
    for (i = 0; i < limit; ++i) {
      fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL);
      free(buffer[i].data);
    }
    initIndex += nBuf;
  }
  assert(initIndex == serverState.commitIndex);

84
  //pRaft->heartbeatTimeoutTick = 1;
85

86
  syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
87

88 89
  pRaft->selfIndex = pRaft->cluster.selfIndex;

90 91
  syncInfo("[%d:%d] restore vgid %d state: snapshot index success", 
    pRaft->selfGroupId, pRaft->selfId, pInfo->vgId);
92 93 94
  return 0;
}

95
int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
96 97
  syncDebug("from %d, type:%d, term:%" PRId64 ", state:%d",
    pMsg->from, pMsg->msgType, pMsg->term, pRaft->state);
98

99 100 101 102 103
  if (preHandleMessage(pRaft, pMsg)) {
    syncFreeMessage(pMsg);
    return 0;
  }

104
  ESyncRaftMessageType msgType = pMsg->msgType;
105
  if (msgType == RAFT_MSG_INTERNAL_ELECTION) {
106
    syncRaftHandleElectionMessage(pRaft, pMsg);
107
  } else if (msgType == RAFT_MSG_VOTE) {
108
    syncRaftHandleVoteMessage(pRaft, pMsg);
109 110 111 112
  } else {
    pRaft->stepFp(pRaft, pMsg);
  }

113 114 115 116 117
  syncFreeMessage(pMsg);
  return 0;
}

int32_t syncRaftTick(SSyncRaft* pRaft) {
118
  pRaft->currentTick += 1;
119
  return 0;
120 121
}

122
/**
123
 * pre-handle message, return true means no need to continue
124 125 126 127 128 129 130 131 132 133
 * Handle the message term, which may result in our stepping down to a follower.
 **/
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
  // local message?
  if (pMsg->term == 0) {
    return false;
  }

  if (pMsg->term > pRaft->term) {
    return preHandleNewTermMessage(pRaft, pMsg);
134 135
  } else if (pMsg->term < pRaft->term) {
    return preHandleOldTermMessage(pRaft, pMsg);
136 137
  }

138
  return false;
139 140 141 142
}

static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
  SyncNodeId leaderId = pMsg->from;
143
  ESyncRaftMessageType msgType = pMsg->msgType;
144

145
  if (msgType == RAFT_MSG_VOTE) {
146
    // TODO
147 148 149
    leaderId = SYNC_NON_NODE_ID;
  }

150
  if (syncIsPreVoteMsg(pMsg)) {
151
    // Never change our term in response to a PreVote
152
  } else if (syncIsPreVoteRespMsg(pMsg) && !pMsg->voteResp.rejected) {
153 154 155 156 157 158 159 160
		/**
     * We send pre-vote requests with a term in our future. If the
		 * pre-vote is granted, we will increment our term when we get a
		 * quorum. If it is not, the term comes from the node that
		 * rejected our vote so we should become a follower at the new
		 * term.
     **/
  } else {
161 162
    syncInfo("[%d:%d] [term:%" PRId64 "] received a %d message with higher term from %d [term:%" PRId64 "]",
      pRaft->selfGroupId, pRaft->selfId, pRaft->term, msgType, pMsg->from, pMsg->term);
163 164 165 166 167 168 169
    syncRaftBecomeFollower(pRaft, pMsg->term, leaderId);
  }

  return false;
}

static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  if (pRaft->checkQuorum && pMsg->msgType == RAFT_MSG_APPEND) {
		/**
     * We have received messages from a leader at a lower term. It is possible
		 * that these messages were simply delayed in the network, but this could
		 * also mean that this node has advanced its term number during a network
		 * partition, and it is now unable to either win an election or to rejoin
	   * the majority on the old term. If checkQuorum is false, this will be
		 * handled by incrementing term numbers in response to MsgVote with a
		 * higher term, but if checkQuorum is true we may not advance the term on
		 * MsgVote and must generate other messages to advance the term. The net
		 * result of these two features is to minimize the disruption caused by
		 * nodes that have been removed from the cluster's configuration: a
		 * removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
		 * but it will not receive MsgApp or MsgHeartbeat, so it will not create
		 * disruptive term increases
    **/
    int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
    if (peerIndex < 0) {
      return true;
    }
    SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term);
    if (msg == NULL) {
      return true;
    }

    pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex]));
  } else {
    // ignore other cases
    syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]",
      pRaft->selfGroupId, pRaft->selfId, pRaft->term, pMsg->msgType, pMsg->from, pMsg->term);    
  }

202
  return true;
203
}