sync_raft_impl.c 11.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "raft.h"
L
lichuang 已提交
17
#include "sync_raft_impl.h"
18 19 20 21 22 23 24 25 26 27
#include "raft_log.h"
#include "raft_replication.h"
#include "sync_raft_progress_tracker.h"
#include "syncInt.h"

static int convertClear(SSyncRaft* pRaft);
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg);

28 29
static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n);

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
static int triggerAll(SSyncRaft* pRaft);

static void tickElection(SSyncRaft* pRaft);
static void tickHeartbeat(SSyncRaft* pRaft);

static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n);

static void abortLeaderTransfer(SSyncRaft* pRaft);

static void resetRaft(SSyncRaft* pRaft, SyncTerm term);

void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) {
  convertClear(pRaft);

  pRaft->stepFp = stepFollower;
  resetRaft(pRaft, term);
  pRaft->tickFp = tickElection;
  pRaft->leaderId = leaderId;
48
  pRaft->state = TAOS_SYNC_STATE_FOLLOWER;
49 50 51 52 53 54 55 56 57 58 59 60 61
  syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}

void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
  convertClear(pRaft);

	/**
   * Becoming a pre-candidate changes our step functions and state,
	 * but doesn't change anything else. In particular it does not increase
	 * r.Term or change r.Vote.
   **/
  pRaft->stepFp = stepCandidate;
  pRaft->tickFp = tickElection;
62
  pRaft->state  = TAOS_SYNC_STATE_CANDIDATE;
63 64 65 66 67 68 69 70 71 72 73 74 75
  pRaft->candidateState.inPreVote = true;
  syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}

void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
  convertClear(pRaft);

  pRaft->candidateState.inPreVote = false;
  pRaft->stepFp = stepCandidate;
  // become candidate make term+1
  resetRaft(pRaft, pRaft->term + 1);
  pRaft->tickFp = tickElection;
  pRaft->voteFor = pRaft->selfId;
76
  pRaft->state  = TAOS_SYNC_STATE_CANDIDATE;
77 78 79 80
  syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}

void syncRaftBecomeLeader(SSyncRaft* pRaft) {
81
  assert(pRaft->state != TAOS_SYNC_STATE_FOLLOWER);
82 83 84 85

  pRaft->stepFp = stepLeader;
  resetRaft(pRaft, pRaft->term);
  pRaft->leaderId = pRaft->leaderId;
86
  pRaft->state  = TAOS_SYNC_STATE_LEADER;
87

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
  SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId);
  assert(progress != NULL);
	// Followers enter replicate mode when they've been successfully probed
	// (perhaps after having received a snapshot as a result). The leader is
	// trivially in this state. Note that r.reset() has initialized this
	// progress with the last index already.  
  syncRaftProgressBecomeReplicate(progress);

	// Conservatively set the pendingConfIndex to the last index in the
	// log. There may or may not be a pending config change, but it's
	// safe to delay any future proposals until we commit all our
	// pending log entries, and scanning the entire tail of the log
	// could be expensive.
  SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
  pRaft->pendingConfigIndex = lastIndex;
103 104 105 106 107 108 109 110 111 112 113 114 115 116

  // after become leader, send a no-op log
  SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry));
  if (entry == NULL) {
    return;
  }
  *entry = (SSyncRaftEntry) {
    .buffer = (SSyncBuffer) {
      .data = NULL,
      .len = 0,
    }
  };
  appendEntries(pRaft, entry, 1);
  //syncRaftTriggerHeartbeat(pRaft);
117
  syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
}

void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) {
  triggerAll(pRaft);
}

void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) {
  // electionTimeoutTick in [3,6] tick
  pRaft->randomizedElectionTimeout = taosRand() % 4 + 3;
}

bool syncRaftIsPromotable(SSyncRaft* pRaft) {
  return pRaft->selfId != SYNC_NON_NODE_ID;
}

bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
  return pRaft->electionElapsed >= pRaft->randomizedElectionTimeout;
}

int syncRaftQuorum(SSyncRaft* pRaft) {
L
lichuang 已提交
138 139
  return 0;
  //return pRaft->cluster.replica / 2 + 1;
140 141
}

142
ESyncRaftVoteResult  syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, 
143 144
                                      bool preVote, bool grant, 
                                      int* rejected, int *granted) {
L
lichuang 已提交
145 146 147
  SNodeInfo* pNode = syncRaftGetNodeById(pRaft, id);
  if (pNode == NULL) {
    return true;
148 149 150 151 152 153 154 155 156 157
  }

  if (grant) {
    syncInfo("[%d:%d] received grant (pre-vote %d) from %d at term %" PRId64 "", 
      pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
  } else {
    syncInfo("[%d:%d] received rejection (pre-vote %d) from %d at term %" PRId64 "", 
      pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
  }

L
lichuang 已提交
158
  syncRaftRecordVote(pRaft->tracker, pNode->nodeId, grant);
159 160 161 162 163 164 165 166 167 168 169
  return syncRaftTallyVotes(pRaft->tracker, rejected, granted);
}
/*
  if (accept) {
    syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "", 
      pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
  } else {
    syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "", 
      pRaft->selfGroupId, pRaft->selfId, id, pRaft->term);
  }
  
L
lichuang 已提交
170
  int voteIndex = syncRaftGetNodeById(pRaft, id);
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
  assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0);
  assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN);

  pRaft->candidateState.votes[voteIndex] = accept ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT;
  int granted = 0, rejected = 0;
  int i;
  for (i = 0; i < pRaft->cluster.replica; ++i) {
    if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) granted++;
    else if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_REJECT) rejected++;
  }

  if (rejectNum) *rejectNum = rejected;
  return granted;
*/

186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) {
  SyncIndex commitIndex = serverState->commitIndex;
  SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);

  if (commitIndex < pRaft->log->commitIndex || commitIndex > lastIndex) {
    syncFatal("[%d:%d] state.commit %"PRId64" is out of range [%" PRId64 ",%" PRId64 "",
      pRaft->selfGroupId, pRaft->selfId, commitIndex, pRaft->log->commitIndex, lastIndex);
    return;
  }

  pRaft->log->commitIndex = commitIndex;
  pRaft->term = serverState->term;
  pRaft->voteFor = serverState->voteFor;
}

201
static void visitProgressSendAppend(SSyncRaftProgress* progress, void* arg) {
202 203 204 205
  SSyncRaft* pRaft = (SSyncRaft*)arg;
  if (pRaft->selfId == progress->id) {
    return;
  }
206

207
  syncRaftMaybeSendAppend(arg, progress, true);
208 209
}

210 211
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
212 213
void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
  syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft);
214 215
}

L
lichuang 已提交
216
SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id) {
L
lichuang 已提交
217
  SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SyncNodeId*));
L
lichuang 已提交
218 219 220 221 222 223 224
  if (ppNode != NULL) {
    return *ppNode;
  }

  return NULL;
}

225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
static int convertClear(SSyncRaft* pRaft) {

}

static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
  
  return 0;
}

static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
  /**
   * Only handle vote responses corresponding to our candidacy (while in
	 * StateCandidate, we may get stale MsgPreVoteResp messages in this term from
	 * our pre-candidate state).
   **/
240
  ESyncRaftMessageType msgType = pMsg->msgType;
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260

  if (msgType == RAFT_MSG_INTERNAL_PROP) {
    return 0;
  }

  if (msgType == RAFT_MSG_VOTE_RESP) {
    syncRaftHandleVoteRespMessage(pRaft, pMsg);
    return 0;
  } else if (msgType == RAFT_MSG_APPEND) {
    syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from);
    syncRaftHandleAppendEntriesMessage(pRaft, pMsg);
  }
  return 0;
}

static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
  convertClear(pRaft);
  return 0;
}

L
lichuang 已提交
261
// tickElection is run by followers and candidates after r.electionTimeout.
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
static void tickElection(SSyncRaft* pRaft) {
  pRaft->electionElapsed += 1;

  if (!syncRaftIsPromotable(pRaft)) {
    return;
  }

  if (!syncRaftIsPastElectionTimeout(pRaft)) {
    return;
  }

  // election timeout
  pRaft->electionElapsed = 0;
  SSyncMessage msg;
  syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId));
}

L
lichuang 已提交
279
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
280 281 282 283
static void tickHeartbeat(SSyncRaft* pRaft) {

}

284 285 286 287 288
// TODO
static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
  return false;
}

289 290 291 292 293 294 295 296 297 298
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
  SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
  SyncTerm term = pRaft->term;
  int i;

  for (i = 0; i < n; ++i) {
    entries[i].term = term;
    entries[i].index = lastIndex + 1 + i;
  }

299 300 301 302 303 304
  // Track the size of this uncommitted proposal.
  if (!increaseUncommittedSize(pRaft, entries, n)) {
    // Drop the proposal.
    return;    
  }

305 306
  syncRaftLogAppend(pRaft->log, entries, n);

307
  SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId);
308
  assert(progress != NULL);
309
  syncRaftProgressMaybeUpdate(progress, lastIndex);
310 311
  // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend.
  syncRaftMaybeCommit(pRaft);
312 313
}

314 315 316 317
// syncRaftMaybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
bool syncRaftMaybeCommit(SSyncRaft* pRaft) {
318 319 320 321 322 323 324 325
  
  return true;
}

/**
 * trigger I/O requests for newly appended log entries or heartbeats.
 **/
static int triggerAll(SSyncRaft* pRaft) {
326
  #if 0
327
  assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
328 329 330 331 332 333 334
  int i;

  for (i = 0; i < pRaft->cluster.replica; ++i) {
    if (i == pRaft->cluster.selfIndex) {
      continue;
    }

335
    syncRaftMaybeSendAppend(pRaft, pRaft->tracker->progressMap.progress[i], true);
336
  }
337 338
  #endif
  return 0;
339 340 341 342 343 344
}

static void abortLeaderTransfer(SSyncRaft* pRaft) {
  pRaft->leadTransferee = SYNC_NON_NODE_ID;
}

345 346
static void resetProgress(SSyncRaftProgress* progress, void* arg) {
  syncRaftResetProgress((SSyncRaft*)arg, progress);
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
}

static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
  if (pRaft->term != term) {
    pRaft->term = term;
    pRaft->voteFor = SYNC_NON_NODE_ID;
  }

  pRaft->leaderId = SYNC_NON_NODE_ID;

  pRaft->electionElapsed = 0;
  pRaft->heartbeatElapsed = 0;

  syncRaftRandomizedElectionTimeout(pRaft);

  abortLeaderTransfer(pRaft);

  syncRaftResetVotes(pRaft->tracker);
365
  syncRaftProgressVisit(pRaft->tracker, resetProgress, pRaft);
366 367 368 369

  pRaft->pendingConfigIndex = 0;
  pRaft->uncommittedSize = 0;
}