raft_message.h 6.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_RAFT_MESSAGE_H
#define _TD_LIBS_SYNC_RAFT_MESSAGE_H

#include "sync.h"
20
#include "sync_type.h"
21 22

/** 
23 24 25 26 27 28 29
 * below define message type which handled by Raft.
 * 
 * internal message, which communicate between threads, start with RAFT_MSG_INTERNAL_*.
 * internal message use pointer only and stack memory, need not to be decode/encode and free.
 * 
 * outter message start with RAFT_MSG_*, which communicate between cluster peers,
 * need to implement its decode/encode functions.
30
 **/
31
typedef enum ESyncRaftMessageType {
32 33 34
  // client propose a cmd
  RAFT_MSG_INTERNAL_PROP = 1,

35 36
  // node election timeout
  RAFT_MSG_INTERNAL_ELECTION = 2,
37

38 39
  RAFT_MSG_VOTE = 3,
  RAFT_MSG_VOTE_RESP = 4,
40

41
  RAFT_MSG_APPEND = 5,
42
  RAFT_MSG_APPEND_RESP = 6,
43
} ESyncRaftMessageType;
44 45 46 47 48 49 50

typedef struct RaftMsgInternal_Prop {
  const SSyncBuffer *pBuf;
  bool isWeak;
  void* pData;
} RaftMsgInternal_Prop;

51 52 53 54
typedef struct RaftMsgInternal_Election {

} RaftMsgInternal_Election;

55
typedef struct RaftMsg_Vote {
56
  ESyncRaftElectionType cType;
57 58 59 60 61
  SyncIndex lastIndex;
  SyncTerm lastTerm;
} RaftMsg_Vote;

typedef struct RaftMsg_VoteResp {
62
  bool rejected;
63
  ESyncRaftElectionType cType;
64
} RaftMsg_VoteResp;
65

66 67
typedef struct RaftMsg_Append_Entries {
  // index of log entry preceeding new ones
68
  SyncIndex index;
69 70

  // term of entry at prevIndex
71
  SyncTerm term;
72 73 74 75 76 77 78 79 80 81 82

  // leader's commit index.
  SyncIndex commitIndex;

  // size of the log entries array
  int nEntries;

  // log entries array
  SSyncRaftEntry* entries;
} RaftMsg_Append_Entries;

83 84 85 86
typedef struct RaftMsg_Append_Resp {
  SyncIndex index;
} RaftMsg_Append_Resp;

87
typedef struct SSyncMessage {
88
  ESyncRaftMessageType msgType;
89
  SyncTerm term;
90
  SyncGroupId groupId;
91 92 93 94
  SyncNodeId from;

  union {
    RaftMsgInternal_Prop propose;
95 96 97

    RaftMsgInternal_Election election;

98 99
    RaftMsg_Vote vote;
    RaftMsg_VoteResp voteResp;
100 101

    RaftMsg_Append_Entries appendEntries;
102
    RaftMsg_Append_Resp appendResp;
103
  };
104
} SSyncMessage;
105

106 107
static FORCE_INLINE SSyncMessage* syncInitPropMsg(SSyncMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) {
  *pMsg = (SSyncMessage) {
108
    .msgType = RAFT_MSG_INTERNAL_PROP,
109
    .term = 0,
110 111 112 113 114 115 116 117 118 119
    .propose = (RaftMsgInternal_Prop) {
      .isWeak = isWeak,
      .pBuf = pBuf,
      .pData = pData,
    },
  };

  return pMsg;
}

120 121 122 123 124 125 126 127 128 129 130 131 132
static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNodeId from) {
  *pMsg = (SSyncMessage) {
    .msgType = RAFT_MSG_INTERNAL_ELECTION,
    .term = 0,
    .from = from,
    .election = (RaftMsgInternal_Election) {

    },
  };

  return pMsg;
}

133
static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from,
134
                                                SyncTerm term, ESyncRaftElectionType cType, 
135 136 137 138 139 140 141 142 143
                                                SyncIndex lastIndex, SyncTerm lastTerm) {
  SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
  if (pMsg == NULL) {
    return NULL;
  }
  *pMsg = (SSyncMessage) {
    .groupId = groupId,
    .from = from,
    .term = term,
144
    .msgType = RAFT_MSG_VOTE,
145 146 147 148 149 150 151 152 153 154
    .vote = (RaftMsg_Vote) {
      .cType = cType,
      .lastIndex = lastIndex,
      .lastTerm = lastTerm,
    },
  };

  return pMsg;
}

155
static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from,
156
                                                  ESyncRaftElectionType cType, bool rejected) {
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
  SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
  if (pMsg == NULL) {
    return NULL;
  }
  *pMsg = (SSyncMessage) {
    .groupId = groupId,
    .from = from,
    .msgType = RAFT_MSG_VOTE_RESP,
    .voteResp = (RaftMsg_VoteResp) {
      .cType = cType,
      .rejected = rejected,
    },
  };

  return pMsg;
}

174
static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNodeId from,
175
                                                  SyncTerm term, SyncIndex logIndex, SyncTerm logTerm,
176 177 178 179 180 181 182 183 184 185 186
                                                  SyncIndex commitIndex, int nEntries, SSyncRaftEntry* entries) {
  SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
  if (pMsg == NULL) {
    return NULL;
  }
  *pMsg = (SSyncMessage) {
    .groupId = groupId,
    .from = from,
    .term = term,
    .msgType = RAFT_MSG_APPEND,
    .appendEntries = (RaftMsg_Append_Entries) {
187 188
      .index = logIndex,
      .term = logTerm,
189 190 191 192 193 194 195 196 197
      .commitIndex = commitIndex,
      .nEntries = nEntries,
      .entries = entries,
    },
  };

  return pMsg;
}

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId, SyncNodeId from, SyncTerm term) {
  SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
  if (pMsg == NULL) {
    return NULL;
  }
  *pMsg = (SSyncMessage) {
    .groupId = groupId,
    .from = from,
    .term = term,
    .msgType = RAFT_MSG_APPEND_RESP,
    .appendResp = (RaftMsg_Append_Resp) {

    },
  };

  return pMsg;
}

216
static FORCE_INLINE bool syncIsInternalMsg(ESyncRaftMessageType msgType) {
217 218 219 220
  return msgType == RAFT_MSG_INTERNAL_PROP ||
         msgType == RAFT_MSG_INTERNAL_ELECTION;
}

221
static FORCE_INLINE bool syncIsPreVoteRespMsg(const SSyncMessage* pMsg) {
222 223 224
  return pMsg->msgType == RAFT_MSG_VOTE_RESP && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION;
}

225
static FORCE_INLINE bool syncIsPreVoteMsg(const SSyncMessage* pMsg) {
226
  return pMsg->msgType == RAFT_MSG_VOTE && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION;
227 228
}

229
void syncFreeMessage(const SSyncMessage* pMsg);
230

231
// message handlers
232
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
233
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
234
int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
235
int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
236

237
#endif  /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */