syncRequestVote.c 8.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRequestVote.h"
M
Minghao Li 已提交
17
#include "syncInt.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19 20 21
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
// TLA+ Spec
// HandleRequestVoteRequest(i, j, m) ==
//    LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
//                 \/ /\ m.mlastLogTerm = LastTerm(log[i])
//                    /\ m.mlastLogIndex >= Len(log[i])
//        grant == /\ m.mterm = currentTerm[i]
//                 /\ logOk
//                 /\ votedFor[i] \in {Nil, j}
//    IN /\ m.mterm <= currentTerm[i]
//       /\ \/ grant  /\ votedFor' = [votedFor EXCEPT ![i] = j]
//          \/ ~grant /\ UNCHANGED votedFor
//       /\ Reply([mtype        |-> RequestVoteResponse,
//                 mterm        |-> currentTerm[i],
//                 mvoteGranted |-> grant,
//                 \* mlog is used just for the `elections' history variable for
//                 \* the proof. It would not exist in a real implementation.
//                 mlog         |-> log[i],
//                 msource      |-> i,
//                 mdest        |-> j],
//                 m)
//       /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
M
Minghao Li 已提交
44
//
M
Minghao Li 已提交
45 46
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
47

M
Minghao Li 已提交
48 49 50 51 52 53 54 55 56 57
  syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg);

  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
    do {
      char     logBuf[256];
      char     host[64];
      uint16_t port;
      syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
      snprintf(logBuf, sizeof(logBuf),
58 59
               "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
               ", maybe replica already dropped",
M
Minghao Li 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
               host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
      syncNodeEventLog(ths, logBuf);
    } while (0);

    return -1;
  }

  // maybe update term
  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
  ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);

  bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
               ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
                (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
  bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
               ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
  if (grant) {
    // maybe has already voted for pMsg->srcId
    // vote again, no harm
    raftStoreVote(ths->pRaftStore, &(pMsg->srcId));

    // forbid elect for this round
    syncNodeResetElectTimer(ths);
  }

  // send msg
  SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
  pReply->srcId = ths->myRaftId;
  pReply->destId = pMsg->srcId;
  pReply->term = ths->pRaftStore->currentTerm;
  pReply->voteGranted = grant;

  // trace log
  do {
    char     logBuf[256];
    char     host[64];
    uint16_t port;
    syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
    snprintf(logBuf, sizeof(logBuf),
101 102 103
             "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
             ", reply-grant:%d",
             host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
M
Minghao Li 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
    syncNodeEventLog(ths, logBuf);
  } while (0);

  SRpcMsg rpcMsg;
  syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
  syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
  syncRequestVoteReplyDestroy(pReply);

  return ret;
}

#if 0
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
  int32_t ret = 0;

119
  char logBuf[128] = {0};
S
Shengliang Guan 已提交
120
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteCb== term:%" PRIu64, ths->pRaftStore->currentTerm);
M
Minghao Li 已提交
121
  syncRequestVoteLog2(logBuf, pMsg);
M
Minghao Li 已提交
122 123 124 125

  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
M
Minghao Li 已提交
126
  ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
M
Minghao Li 已提交
127 128 129 130 131 132 133

  bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
               ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
                (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
  bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
               ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
  if (grant) {
M
Minghao Li 已提交
134 135
    // maybe has already voted for pMsg->srcId
    // vote again, no harm
M
Minghao Li 已提交
136
    raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
137

138
    // forbid elect for this round
139
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
140 141
  }

M
Minghao Li 已提交
142
  SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
M
Minghao Li 已提交
143 144 145 146 147 148 149 150 151 152 153 154
  pReply->srcId = ths->myRaftId;
  pReply->destId = pMsg->srcId;
  pReply->term = ths->pRaftStore->currentTerm;
  pReply->voteGranted = grant;

  SRpcMsg rpcMsg;
  syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
  syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
  syncRequestVoteReplyDestroy(pReply);

  return ret;
}
M
Minghao Li 已提交
155
#endif
M
Minghao Li 已提交
156

157 158 159 160
static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) {
  SyncTerm  myLastTerm = syncNodeGetLastTerm(pSyncNode);
  SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);

161 162 163 164
  if (myLastTerm == SYNC_TERM_INVALID) {
    return false;
  }

165 166 167 168 169 170 171 172 173 174
  if (pMsg->lastLogTerm > myLastTerm) {
    return true;
  }
  if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) {
    return true;
  }

  return false;
}

175 176 177
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
  int32_t ret = 0;

M
Minghao Li 已提交
178
  // if already drop replica, do not process
M
Minghao Li 已提交
179
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
M
Minghao Li 已提交
180
    do {
M
Minghao Li 已提交
181
      char     logBuf[256];
M
Minghao Li 已提交
182 183 184 185
      char     host[64];
      uint16_t port;
      syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
      snprintf(logBuf, sizeof(logBuf),
186 187
               "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
               ", maybe replica already dropped",
M
Minghao Li 已提交
188 189 190 191 192
               host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
      syncNodeEventLog(ths, logBuf);
    } while (0);

    return -1;
M
Minghao Li 已提交
193 194
  }

195
  // maybe update term
196 197 198
  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
199
  ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
200

201
  bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
202 203 204 205 206 207
  bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
               ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
  if (grant) {
    // maybe has already voted for pMsg->srcId
    // vote again, no harm
    raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
208 209 210

    // forbid elect for this round
    syncNodeResetElectTimer(ths);
211 212
  }

213
  // send msg
214 215 216 217 218 219
  SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
  pReply->srcId = ths->myRaftId;
  pReply->destId = pMsg->srcId;
  pReply->term = ths->pRaftStore->currentTerm;
  pReply->voteGranted = grant;

M
Minghao Li 已提交
220 221
  // trace log
  do {
M
Minghao Li 已提交
222
    char     logBuf[256];
M
Minghao Li 已提交
223 224 225 226
    char     host[64];
    uint16_t port;
    syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
    snprintf(logBuf, sizeof(logBuf),
227 228 229
             "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
             ", reply-grant:%d",
             host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
M
Minghao Li 已提交
230 231 232
    syncNodeEventLog(ths, logBuf);
  } while (0);

233 234 235 236 237
  SRpcMsg rpcMsg;
  syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
  syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
  syncRequestVoteReplyDestroy(pReply);

M
Minghao Li 已提交
238
  return 0;
239
}