syncRequestVote.c 7.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRequestVote.h"
M
Minghao Li 已提交
17
#include "syncInt.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19 20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
// TLA+ Spec
// HandleRequestVoteRequest(i, j, m) ==
//    LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
//                 \/ /\ m.mlastLogTerm = LastTerm(log[i])
//                    /\ m.mlastLogIndex >= Len(log[i])
//        grant == /\ m.mterm = currentTerm[i]
//                 /\ logOk
//                 /\ votedFor[i] \in {Nil, j}
//    IN /\ m.mterm <= currentTerm[i]
//       /\ \/ grant  /\ votedFor' = [votedFor EXCEPT ![i] = j]
//          \/ ~grant /\ UNCHANGED votedFor
//       /\ Reply([mtype        |-> RequestVoteResponse,
//                 mterm        |-> currentTerm[i],
//                 mvoteGranted |-> grant,
//                 \* mlog is used just for the `elections' history variable for
//                 \* the proof. It would not exist in a real implementation.
//                 mlog         |-> log[i],
//                 msource      |-> i,
//                 mdest        |-> j],
//                 m)
//       /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
M
Minghao Li 已提交
44
//
M
Minghao Li 已提交
45 46
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
47

M
Minghao Li 已提交
48 49
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
M
Minghao Li 已提交
50
    syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
M
Minghao Li 已提交
51 52 53
    return -1;
  }

54 55 56 57
  bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
               ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
                (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));

M
Minghao Li 已提交
58 59
  // maybe update term
  if (pMsg->term > ths->pRaftStore->currentTerm) {
60 61
    syncNodeUpdateTerm(ths, pMsg->term);
#if 0  
62 63 64 65 66
    if (logOK) {
      syncNodeUpdateTerm(ths, pMsg->term);
    } else {
      syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
    }
67
#endif
M
Minghao Li 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  }
  ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);

  bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
               ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
  if (grant) {
    // maybe has already voted for pMsg->srcId
    // vote again, no harm
    raftStoreVote(ths->pRaftStore, &(pMsg->srcId));

    // forbid elect for this round
    syncNodeResetElectTimer(ths);
  }

  // send msg
  SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
  pReply->srcId = ths->myRaftId;
  pReply->destId = pMsg->srcId;
  pReply->term = ths->pRaftStore->currentTerm;
  pReply->voteGranted = grant;

  // trace log
  do {
M
Minghao Li 已提交
91 92 93 94
    char logBuf[32];
    snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
    syncLogRecvRequestVote(ths, pMsg, logBuf);
    syncLogSendRequestVoteReply(ths, pReply, "");
M
Minghao Li 已提交
95 96 97 98 99 100 101 102 103 104
  } while (0);

  SRpcMsg rpcMsg;
  syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
  syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
  syncRequestVoteReplyDestroy(pReply);

  return ret;
}

105 106 107 108
static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) {
  SyncTerm  myLastTerm = syncNodeGetLastTerm(pSyncNode);
  SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);

109
  if (myLastTerm == SYNC_TERM_INVALID) {
M
Minghao Li 已提交
110 111 112 113 114 115 116 117 118
    do {
      char logBuf[128];
      snprintf(logBuf, sizeof(logBuf),
               "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
               ", recv-term:%" PRIu64 "}",
               myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
      syncNodeEventLog(pSyncNode, logBuf);
    } while (0);

119 120 121
    return false;
  }

122
  if (pMsg->lastLogTerm > myLastTerm) {
M
Minghao Li 已提交
123 124 125 126 127 128 129 130 131
    do {
      char logBuf[128];
      snprintf(logBuf, sizeof(logBuf),
               "logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
               ", recv-term:%" PRIu64 "}",
               myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
      syncNodeEventLog(pSyncNode, logBuf);
    } while (0);

132 133
    return true;
  }
M
Minghao Li 已提交
134

135
  if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) {
M
Minghao Li 已提交
136 137 138 139 140 141 142 143 144
    do {
      char logBuf[128];
      snprintf(logBuf, sizeof(logBuf),
               "logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
               ", recv-term:%" PRIu64 "}",
               myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
      syncNodeEventLog(pSyncNode, logBuf);
    } while (0);

145 146 147
    return true;
  }

M
Minghao Li 已提交
148 149 150 151 152 153 154 155 156
  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf),
             "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
             ", recv-term:%" PRIu64 "}",
             myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

157 158 159
  return false;
}

160 161 162
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
  int32_t ret = 0;

M
Minghao Li 已提交
163
  // if already drop replica, do not process
M
Minghao Li 已提交
164
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
M
Minghao Li 已提交
165
    syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
M
Minghao Li 已提交
166
    return -1;
M
Minghao Li 已提交
167 168
  }

169 170
  bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);

171
  // maybe update term
172
  if (pMsg->term > ths->pRaftStore->currentTerm) {
173 174
    syncNodeUpdateTerm(ths, pMsg->term);
#if 0
175 176 177 178 179
    if (logOK) {
      syncNodeUpdateTerm(ths, pMsg->term);
    } else {
      syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
    }
180
#endif
181
  }
182
  ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
183 184 185 186 187 188 189

  bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
               ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
  if (grant) {
    // maybe has already voted for pMsg->srcId
    // vote again, no harm
    raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
190 191 192

    // forbid elect for this round
    syncNodeResetElectTimer(ths);
193 194
  }

195
  // send msg
196 197 198 199 200 201
  SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
  pReply->srcId = ths->myRaftId;
  pReply->destId = pMsg->srcId;
  pReply->term = ths->pRaftStore->currentTerm;
  pReply->voteGranted = grant;

M
Minghao Li 已提交
202 203
  // trace log
  do {
M
Minghao Li 已提交
204 205 206 207
    char logBuf[32];
    snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
    syncLogRecvRequestVote(ths, pMsg, logBuf);
    syncLogSendRequestVoteReply(ths, pReply, "");
M
Minghao Li 已提交
208 209
  } while (0);

210 211 212 213 214
  SRpcMsg rpcMsg;
  syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
  syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
  syncRequestVoteReplyDestroy(pReply);

M
Minghao Li 已提交
215
  return 0;
216
}