You need to sign in or sign up before continuing.
syncVoteMgr.c 9.6 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncVoteMgr.h"
M
Minghao Li 已提交
17 18 19 20 21 22 23
#include "syncUtil.h"

// SVotesGranted -----------------------------
static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
  memset(pVotesGranted->isGranted, 0, sizeof(pVotesGranted->isGranted));
  pVotesGranted->votes = 0;
}
M
Minghao Li 已提交
24 25

SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
wafwerar's avatar
wafwerar 已提交
26
  SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted));
M
Minghao Li 已提交
27 28 29
  assert(pVotesGranted != NULL);
  memset(pVotesGranted, 0, sizeof(SVotesGranted));

M
Minghao Li 已提交
30 31 32 33
  pVotesGranted->replicas = &(pSyncNode->replicasId);
  pVotesGranted->replicaNum = pSyncNode->replicaNum;
  voteGrantedClearVotes(pVotesGranted);

M
Minghao Li 已提交
34
  pVotesGranted->term = 0;
M
Minghao Li 已提交
35
  pVotesGranted->quorum = pSyncNode->quorum;
M
Minghao Li 已提交
36 37 38 39 40 41 42 43
  pVotesGranted->toLeader = false;
  pVotesGranted->pSyncNode = pSyncNode;

  return pVotesGranted;
}

void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
  if (pVotesGranted != NULL) {
wafwerar's avatar
wafwerar 已提交
44
    taosMemoryFree(pVotesGranted);
M
Minghao Li 已提交
45 46 47
  }
}

M
Minghao Li 已提交
48 49 50 51 52 53 54 55 56 57 58
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) {
  pVotesGranted->replicas = &(pSyncNode->replicasId);
  pVotesGranted->replicaNum = pSyncNode->replicaNum;
  voteGrantedClearVotes(pVotesGranted);

  pVotesGranted->term = 0;
  pVotesGranted->quorum = pSyncNode->quorum;
  pVotesGranted->toLeader = false;
  pVotesGranted->pSyncNode = pSyncNode;
}

M
Minghao Li 已提交
59 60 61 62 63 64 65 66
bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
  bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
  return ret;
}

void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
  assert(pMsg->voteGranted == true);
  assert(pMsg->term == pVotesGranted->term);
M
Minghao Li 已提交
67 68 69 70 71 72 73 74 75 76
  assert(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));

  int j = -1;
  for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pVotesGranted->replicas))[i]), &(pMsg->srcId))) {
      j = i;
      break;
    }
  }
  assert(j != -1);
M
Minghao Li 已提交
77
  assert(j >= 0 && j < pVotesGranted->replicaNum);
M
Minghao Li 已提交
78 79 80 81 82 83

  if (pVotesGranted->isGranted[j] != true) {
    ++(pVotesGranted->votes);
    pVotesGranted->isGranted[j] = true;
  }
  assert(pVotesGranted->votes <= pVotesGranted->replicaNum);
M
Minghao Li 已提交
84 85 86 87
}

void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
  pVotesGranted->term = term;
M
Minghao Li 已提交
88
  voteGrantedClearVotes(pVotesGranted);
M
Minghao Li 已提交
89 90 91
  pVotesGranted->toLeader = false;
}

M
Minghao Li 已提交
92 93 94 95
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
96 97 98 99 100 101 102
  if (pVotesGranted != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
    cJSON *pReplicas = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
    for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
    }
wafwerar's avatar
wafwerar 已提交
103
    int *arr = (int *)taosMemoryMalloc(sizeof(int) * pVotesGranted->replicaNum);
M
Minghao Li 已提交
104 105 106 107
    for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
      arr[i] = pVotesGranted->isGranted[i];
    }
    cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
wafwerar's avatar
wafwerar 已提交
108
    taosMemoryFree(arr);
M
Minghao Li 已提交
109 110 111 112 113 114 115 116 117 118 119 120
    cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);

    cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
    snprintf(u64buf, sizeof(u64buf), "%lu", pVotesGranted->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
    cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
    snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);

    bool majority = voteGrantedMajority(pVotesGranted);
    cJSON_AddNumberToObject(pRoot, "majority", majority);
M
Minghao Li 已提交
121 122
  }

M
Minghao Li 已提交
123 124 125 126 127 128 129
  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
  return pJson;
}

char *voteGranted2Str(SVotesGranted *pVotesGranted) {
  cJSON *pJson = voteGranted2Json(pVotesGranted);
M
Minghao Li 已提交
130
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
131 132 133 134
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
135 136 137 138 139
// for debug -------------------
void voteGrantedPrint(SVotesGranted *pObj) {
  char *serialized = voteGranted2Str(pObj);
  printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
140
  taosMemoryFree(serialized);
M
Minghao Li 已提交
141 142 143 144 145 146
}

void voteGrantedPrint2(char *s, SVotesGranted *pObj) {
  char *serialized = voteGranted2Str(pObj);
  printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
147
  taosMemoryFree(serialized);
M
Minghao Li 已提交
148 149 150 151 152
}

void voteGrantedLog(SVotesGranted *pObj) {
  char *serialized = voteGranted2Str(pObj);
  sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
153
  taosMemoryFree(serialized);
M
Minghao Li 已提交
154 155 156 157 158
}

void voteGrantedLog2(char *s, SVotesGranted *pObj) {
  char *serialized = voteGranted2Str(pObj);
  sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
159
  taosMemoryFree(serialized);
M
Minghao Li 已提交
160 161
}

M
Minghao Li 已提交
162
// SVotesRespond -----------------------------
M
Minghao Li 已提交
163
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
wafwerar's avatar
wafwerar 已提交
164
  SVotesRespond *pVotesRespond = taosMemoryMalloc(sizeof(SVotesRespond));
M
Minghao Li 已提交
165 166 167 168 169 170 171 172 173 174 175
  assert(pVotesRespond != NULL);
  memset(pVotesRespond, 0, sizeof(SVotesRespond));

  pVotesRespond->replicas = &(pSyncNode->replicasId);
  pVotesRespond->replicaNum = pSyncNode->replicaNum;
  pVotesRespond->term = 0;
  pVotesRespond->pSyncNode = pSyncNode;

  return pVotesRespond;
}

M
Minghao Li 已提交
176 177
void votesRespondDestory(SVotesRespond *pVotesRespond) {
  if (pVotesRespond != NULL) {
wafwerar's avatar
wafwerar 已提交
178
    taosMemoryFree(pVotesRespond);
M
Minghao Li 已提交
179 180 181
  }
}

M
Minghao Li 已提交
182 183 184 185 186 187 188
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) {
  pVotesRespond->replicas = &(pSyncNode->replicasId);
  pVotesRespond->replicaNum = pSyncNode->replicaNum;
  pVotesRespond->term = 0;
  pVotesRespond->pSyncNode = pSyncNode;
}

M
Minghao Li 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
  bool ret = false;
  for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
    if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) {
      ret = true;
      break;
    }
  }
  return ret;
}

void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
  assert(pVotesRespond->term == pMsg->term);
  for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
M
Minghao Li 已提交
203 204
    if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
      // assert(pVotesRespond->isRespond[i] == false);
M
Minghao Li 已提交
205 206 207 208 209 210 211
      pVotesRespond->isRespond[i] = true;
      return;
    }
  }
  assert(0);
}

M
Minghao Li 已提交
212
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
M
Minghao Li 已提交
213
  pVotesRespond->term = term;
M
Minghao Li 已提交
214 215 216 217 218 219 220 221 222 223 224 225
  memset(pVotesRespond->isRespond, 0, sizeof(pVotesRespond->isRespond));
  /*
    for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
      pVotesRespond->isRespond[i] = false;
    }
  */
}

cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
  char   u64buf[128];
  cJSON *pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
226 227 228 229 230 231 232 233
  if (pVotesRespond != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
    cJSON *pReplicas = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
    for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i]));
    }
    int  respondNum = 0;
wafwerar's avatar
wafwerar 已提交
234
    int *arr = (int *)taosMemoryMalloc(sizeof(int) * pVotesRespond->replicaNum);
M
Minghao Li 已提交
235 236 237 238 239
    for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
      arr[i] = pVotesRespond->isRespond[i];
      if (pVotesRespond->isRespond[i]) {
        respondNum++;
      }
M
Minghao Li 已提交
240
    }
M
Minghao Li 已提交
241
    cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum);
wafwerar's avatar
wafwerar 已提交
242
    taosMemoryFree(arr);
M
Minghao Li 已提交
243 244 245 246 247 248 249
    cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
    cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);

    snprintf(u64buf, sizeof(u64buf), "%lu", pVotesRespond->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
M
Minghao Li 已提交
250
  }
M
Minghao Li 已提交
251 252 253 254 255 256 257 258

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
  return pJson;
}

char *votesRespond2Str(SVotesRespond *pVotesRespond) {
  cJSON *pJson = votesRespond2Json(pVotesRespond);
M
Minghao Li 已提交
259
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
260 261
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
262 263 264 265 266 267 268
}

// for debug -------------------
void votesRespondPrint(SVotesRespond *pObj) {
  char *serialized = votesRespond2Str(pObj);
  printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
269
  taosMemoryFree(serialized);
M
Minghao Li 已提交
270 271 272 273 274 275
}

void votesRespondPrint2(char *s, SVotesRespond *pObj) {
  char *serialized = votesRespond2Str(pObj);
  printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
276
  taosMemoryFree(serialized);
M
Minghao Li 已提交
277 278 279 280 281
}

void votesRespondLog(SVotesRespond *pObj) {
  char *serialized = votesRespond2Str(pObj);
  sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
282
  taosMemoryFree(serialized);
M
Minghao Li 已提交
283 284 285 286 287
}

void votesRespondLog2(char *s, SVotesRespond *pObj) {
  char *serialized = votesRespond2Str(pObj);
  sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
288
  taosMemoryFree(serialized);
289
}