syncVoteMgr.c 8.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncVoteMgr.h"
M
Minghao Li 已提交
17 18 19 20 21 22 23
#include "syncUtil.h"

// SVotesGranted -----------------------------
static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
  memset(pVotesGranted->isGranted, 0, sizeof(pVotesGranted->isGranted));
  pVotesGranted->votes = 0;
}
M
Minghao Li 已提交
24 25 26 27 28 29

SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
  SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted));
  assert(pVotesGranted != NULL);
  memset(pVotesGranted, 0, sizeof(SVotesGranted));

M
Minghao Li 已提交
30 31 32 33
  pVotesGranted->replicas = &(pSyncNode->replicasId);
  pVotesGranted->replicaNum = pSyncNode->replicaNum;
  voteGrantedClearVotes(pVotesGranted);

M
Minghao Li 已提交
34
  pVotesGranted->term = 0;
M
Minghao Li 已提交
35
  pVotesGranted->quorum = pSyncNode->quorum;
M
Minghao Li 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
  pVotesGranted->toLeader = false;
  pVotesGranted->pSyncNode = pSyncNode;

  return pVotesGranted;
}

void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
  if (pVotesGranted != NULL) {
    free(pVotesGranted);
  }
}

bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
  bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
  return ret;
}

void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
  assert(pMsg->voteGranted == true);
  assert(pMsg->term == pVotesGranted->term);
M
Minghao Li 已提交
56 57 58 59 60 61 62 63 64 65
  assert(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));

  int j = -1;
  for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pVotesGranted->replicas))[i]), &(pMsg->srcId))) {
      j = i;
      break;
    }
  }
  assert(j != -1);
M
Minghao Li 已提交
66
  assert(j >= 0 && j < pVotesGranted->replicaNum);
M
Minghao Li 已提交
67 68 69 70 71 72

  if (pVotesGranted->isGranted[j] != true) {
    ++(pVotesGranted->votes);
    pVotesGranted->isGranted[j] = true;
  }
  assert(pVotesGranted->votes <= pVotesGranted->replicaNum);
M
Minghao Li 已提交
73 74 75 76
}

void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
  pVotesGranted->term = term;
M
Minghao Li 已提交
77
  voteGrantedClearVotes(pVotesGranted);
M
Minghao Li 已提交
78 79 80
  pVotesGranted->toLeader = false;
}

M
Minghao Li 已提交
81 82 83 84 85 86 87 88
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
  cJSON *pReplicas = cJSON_CreateArray();
  cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
  for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
M
Minghao Li 已提交
89
    cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
M
Minghao Li 已提交
90
  }
M
Minghao Li 已提交
91 92 93 94 95 96 97 98
  int *arr = (int *)malloc(sizeof(int) * pVotesGranted->replicaNum);
  for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
    arr[i] = pVotesGranted->isGranted[i];
  }
  cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
  free(arr);
  cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);

M
Minghao Li 已提交
99 100 101 102 103 104 105 106
  cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
  snprintf(u64buf, sizeof(u64buf), "%lu", pVotesGranted->term);
  cJSON_AddStringToObject(pRoot, "term", u64buf);
  cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
  cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
  snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode);
  cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);

M
Minghao Li 已提交
107 108 109
  bool majority = voteGrantedMajority(pVotesGranted);
  cJSON_AddNumberToObject(pRoot, "majority", majority);

M
Minghao Li 已提交
110 111 112 113 114 115 116
  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
  return pJson;
}

char *voteGranted2Str(SVotesGranted *pVotesGranted) {
  cJSON *pJson = voteGranted2Json(pVotesGranted);
M
Minghao Li 已提交
117
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
118 119 120 121
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
// for debug -------------------
void voteGrantedPrint(SVotesGranted *pObj) {
  char *serialized = voteGranted2Str(pObj);
  printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
  free(serialized);
}

void voteGrantedPrint2(char *s, SVotesGranted *pObj) {
  char *serialized = voteGranted2Str(pObj);
  printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
  free(serialized);
}

void voteGrantedLog(SVotesGranted *pObj) {
  char *serialized = voteGranted2Str(pObj);
  sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized);
  free(serialized);
}

void voteGrantedLog2(char *s, SVotesGranted *pObj) {
  char *serialized = voteGranted2Str(pObj);
  sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
  free(serialized);
}

M
Minghao Li 已提交
149
// SVotesRespond -----------------------------
M
Minghao Li 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
  SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
  assert(pVotesRespond != NULL);
  memset(pVotesRespond, 0, sizeof(SVotesRespond));

  pVotesRespond->replicas = &(pSyncNode->replicasId);
  pVotesRespond->replicaNum = pSyncNode->replicaNum;
  pVotesRespond->term = 0;
  pVotesRespond->pSyncNode = pSyncNode;

  return pVotesRespond;
}

M
Minghao Li 已提交
163 164 165 166 167 168
void votesRespondDestory(SVotesRespond *pVotesRespond) {
  if (pVotesRespond != NULL) {
    free(pVotesRespond);
  }
}

M
Minghao Li 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
  bool ret = false;
  for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
    if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) {
      ret = true;
      break;
    }
  }
  return ret;
}

void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
  assert(pVotesRespond->term == pMsg->term);
  for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
M
Minghao Li 已提交
183 184
    if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
      // assert(pVotesRespond->isRespond[i] == false);
M
Minghao Li 已提交
185 186 187 188 189 190 191
      pVotesRespond->isRespond[i] = true;
      return;
    }
  }
  assert(0);
}

M
Minghao Li 已提交
192
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
M
Minghao Li 已提交
193
  pVotesRespond->term = term;
M
Minghao Li 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
  memset(pVotesRespond->isRespond, 0, sizeof(pVotesRespond->isRespond));
  /*
    for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
      pVotesRespond->isRespond[i] = false;
    }
  */
}

cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

  cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
  cJSON *pReplicas = cJSON_CreateArray();
  cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
  for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
    cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i]));
  }
  int  respondNum = 0;
  int *arr = (int *)malloc(sizeof(int) * pVotesRespond->replicaNum);
M
Minghao Li 已提交
214
  for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
M
Minghao Li 已提交
215 216 217 218
    arr[i] = pVotesRespond->isRespond[i];
    if (pVotesRespond->isRespond[i]) {
      respondNum++;
    }
M
Minghao Li 已提交
219
  }
M
Minghao Li 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
  cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum);
  free(arr);
  cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
  cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);

  snprintf(u64buf, sizeof(u64buf), "%lu", pVotesRespond->term);
  cJSON_AddStringToObject(pRoot, "term", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
  cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
  return pJson;
}

char *votesRespond2Str(SVotesRespond *pVotesRespond) {
  cJSON *pJson = votesRespond2Json(pVotesRespond);
  char * serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
}

// for debug -------------------
void votesRespondPrint(SVotesRespond *pObj) {
  char *serialized = votesRespond2Str(pObj);
  printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
  free(serialized);
}

void votesRespondPrint2(char *s, SVotesRespond *pObj) {
  char *serialized = votesRespond2Str(pObj);
  printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
  free(serialized);
}

void votesRespondLog(SVotesRespond *pObj) {
  char *serialized = votesRespond2Str(pObj);
  sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized);
  free(serialized);
}

void votesRespondLog2(char *s, SVotesRespond *pObj) {
  char *serialized = votesRespond2Str(pObj);
  sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
  free(serialized);
M
Minghao Li 已提交
267
}