raft_message.h 5.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_LIBS_SYNC_RAFT_MESSAGE_H
#define _TD_LIBS_SYNC_RAFT_MESSAGE_H

#include "sync.h"
20
#include "sync_type.h"
21 22

/** 
23 24 25 26 27 28 29
 * below define message type which handled by Raft.
 * 
 * internal message, which communicate between threads, start with RAFT_MSG_INTERNAL_*.
 * internal message use pointer only and stack memory, need not to be decode/encode and free.
 * 
 * outter message start with RAFT_MSG_*, which communicate between cluster peers,
 * need to implement its decode/encode functions.
30 31 32 33 34
 **/
typedef enum RaftMessageType {
  // client propose a cmd
  RAFT_MSG_INTERNAL_PROP = 1,

35 36
  // node election timeout
  RAFT_MSG_INTERNAL_ELECTION = 2,
37

38 39
  RAFT_MSG_VOTE = 3,
  RAFT_MSG_VOTE_RESP = 4,
40

41
  RAFT_MSG_APPEND = 5,
42
  RAFT_MSG_APPEND_RESP = 6,
43 44 45 46 47 48 49 50
} RaftMessageType;

typedef struct RaftMsgInternal_Prop {
  const SSyncBuffer *pBuf;
  bool isWeak;
  void* pData;
} RaftMsgInternal_Prop;

51 52 53 54
typedef struct RaftMsgInternal_Election {

} RaftMsgInternal_Election;

55
typedef struct RaftMsg_Vote {
56
  SyncRaftElectionType cType;
57 58 59 60 61
  SyncIndex lastIndex;
  SyncTerm lastTerm;
} RaftMsg_Vote;

typedef struct RaftMsg_VoteResp {
62
  bool rejected;
63
  SyncRaftElectionType cType;
64
} RaftMsg_VoteResp;
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
typedef struct RaftMsg_Append_Entries {
  // index of log entry preceeding new ones
  SyncIndex prevIndex;

  // term of entry at prevIndex
  SyncTerm prevTerm;

  // leader's commit index.
  SyncIndex commitIndex;

  // size of the log entries array
  int nEntries;

  // log entries array
  SSyncRaftEntry* entries;
} RaftMsg_Append_Entries;

83
typedef struct SSyncMessage {
84
  RaftMessageType msgType;
85
  SyncTerm term;
86
  SyncGroupId groupId;
87 88 89 90
  SyncNodeId from;

  union {
    RaftMsgInternal_Prop propose;
91 92 93

    RaftMsgInternal_Election election;

94 95
    RaftMsg_Vote vote;
    RaftMsg_VoteResp voteResp;
96 97

    RaftMsg_Append_Entries appendEntries;
98
  };
99
} SSyncMessage;
100

101 102
static FORCE_INLINE SSyncMessage* syncInitPropMsg(SSyncMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) {
  *pMsg = (SSyncMessage) {
103
    .msgType = RAFT_MSG_INTERNAL_PROP,
104
    .term = 0,
105 106 107 108 109 110 111 112 113 114
    .propose = (RaftMsgInternal_Prop) {
      .isWeak = isWeak,
      .pBuf = pBuf,
      .pData = pData,
    },
  };

  return pMsg;
}

115 116 117 118 119 120 121 122 123 124 125 126 127
static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNodeId from) {
  *pMsg = (SSyncMessage) {
    .msgType = RAFT_MSG_INTERNAL_ELECTION,
    .term = 0,
    .from = from,
    .election = (RaftMsgInternal_Election) {

    },
  };

  return pMsg;
}

128
static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from,
129
                                                SyncTerm term, SyncRaftElectionType cType, 
130 131 132 133 134 135 136 137 138
                                                SyncIndex lastIndex, SyncTerm lastTerm) {
  SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
  if (pMsg == NULL) {
    return NULL;
  }
  *pMsg = (SSyncMessage) {
    .groupId = groupId,
    .from = from,
    .term = term,
139
    .msgType = RAFT_MSG_VOTE,
140 141 142 143 144 145 146 147 148 149
    .vote = (RaftMsg_Vote) {
      .cType = cType,
      .lastIndex = lastIndex,
      .lastTerm = lastTerm,
    },
  };

  return pMsg;
}

150
static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from,
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
                                                  SyncRaftElectionType cType, bool rejected) {
  SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
  if (pMsg == NULL) {
    return NULL;
  }
  *pMsg = (SSyncMessage) {
    .groupId = groupId,
    .from = from,
    .msgType = RAFT_MSG_VOTE_RESP,
    .voteResp = (RaftMsg_VoteResp) {
      .cType = cType,
      .rejected = rejected,
    },
  };

  return pMsg;
}

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNodeId from,
                                                  SyncTerm term, SyncIndex prevIndex, SyncTerm prevTerm,
                                                  SyncIndex commitIndex, int nEntries, SSyncRaftEntry* entries) {
  SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
  if (pMsg == NULL) {
    return NULL;
  }
  *pMsg = (SSyncMessage) {
    .groupId = groupId,
    .from = from,
    .term = term,
    .msgType = RAFT_MSG_APPEND,
    .appendEntries = (RaftMsg_Append_Entries) {
      .prevIndex = prevIndex,
      .prevTerm = prevTerm,
      .commitIndex = commitIndex,
      .nEntries = nEntries,
      .entries = entries,
    },
  };

  return pMsg;
}

193 194 195 196 197
static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) {
  return msgType == RAFT_MSG_INTERNAL_PROP ||
         msgType == RAFT_MSG_INTERNAL_ELECTION;
}

198
static FORCE_INLINE bool syncIsPreVoteRespMsg(const SSyncMessage* pMsg) {
199 200 201
  return pMsg->msgType == RAFT_MSG_VOTE_RESP && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION;
}

202
static FORCE_INLINE bool syncIsPreVoteMsg(const SSyncMessage* pMsg) {
203
  return pMsg->msgType == RAFT_MSG_VOTE && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION;
204 205
}

206
void syncFreeMessage(const SSyncMessage* pMsg);
207

208
// message handlers
209
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
210
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
211
int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
212

213
#endif  /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */