syncVoteMgr.c 8.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncVoteMgr.h"
M
Minghao Li 已提交
18 19 20 21 22 23
#include "syncUtil.h"

static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
  memset(pVotesGranted->isGranted, 0, sizeof(pVotesGranted->isGranted));
  pVotesGranted->votes = 0;
}
M
Minghao Li 已提交
24 25

SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
wafwerar's avatar
wafwerar 已提交
26
  SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted));
27 28 29 30
  if (pVotesGranted == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
M
Minghao Li 已提交
31 32
  memset(pVotesGranted, 0, sizeof(SVotesGranted));

M
Minghao Li 已提交
33 34 35 36
  pVotesGranted->replicas = &(pSyncNode->replicasId);
  pVotesGranted->replicaNum = pSyncNode->replicaNum;
  voteGrantedClearVotes(pVotesGranted);

M
Minghao Li 已提交
37
  pVotesGranted->term = 0;
M
Minghao Li 已提交
38
  pVotesGranted->quorum = pSyncNode->quorum;
M
Minghao Li 已提交
39 40 41 42 43 44 45 46
  pVotesGranted->toLeader = false;
  pVotesGranted->pSyncNode = pSyncNode;

  return pVotesGranted;
}

void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
  if (pVotesGranted != NULL) {
wafwerar's avatar
wafwerar 已提交
47
    taosMemoryFree(pVotesGranted);
M
Minghao Li 已提交
48 49 50
  }
}

M
Minghao Li 已提交
51 52 53 54 55 56 57 58 59 60 61
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) {
  pVotesGranted->replicas = &(pSyncNode->replicasId);
  pVotesGranted->replicaNum = pSyncNode->replicaNum;
  voteGrantedClearVotes(pVotesGranted);

  pVotesGranted->term = 0;
  pVotesGranted->quorum = pSyncNode->quorum;
  pVotesGranted->toLeader = false;
  pVotesGranted->pSyncNode = pSyncNode;
}

M
Minghao Li 已提交
62 63 64 65 66 67
bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
  bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
  return ret;
}

void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
M
Minghao Li 已提交
68
  ASSERT(pMsg->voteGranted == true);
M
Minghao Li 已提交
69 70

  if (pMsg->term != pVotesGranted->term) {
S
Shengliang Guan 已提交
71
    sNTrace(pVotesGranted->pSyncNode, "vote grant vnode error");
M
Minghao Li 已提交
72 73 74
    return;
  }

M
Minghao Li 已提交
75
  ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));
M
Minghao Li 已提交
76 77 78 79 80 81 82 83

  int j = -1;
  for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pVotesGranted->replicas))[i]), &(pMsg->srcId))) {
      j = i;
      break;
    }
  }
M
Minghao Li 已提交
84 85
  ASSERT(j != -1);
  ASSERT(j >= 0 && j < pVotesGranted->replicaNum);
M
Minghao Li 已提交
86 87 88 89 90

  if (pVotesGranted->isGranted[j] != true) {
    ++(pVotesGranted->votes);
    pVotesGranted->isGranted[j] = true;
  }
M
Minghao Li 已提交
91
  ASSERT(pVotesGranted->votes <= pVotesGranted->replicaNum);
M
Minghao Li 已提交
92 93 94 95
}

void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
  pVotesGranted->term = term;
M
Minghao Li 已提交
96
  voteGrantedClearVotes(pVotesGranted);
M
Minghao Li 已提交
97 98 99
  pVotesGranted->toLeader = false;
}

M
Minghao Li 已提交
100
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
101
  char   u64buf[128] = {0};
M
Minghao Li 已提交
102 103
  cJSON *pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
104 105 106 107 108 109 110
  if (pVotesGranted != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
    cJSON *pReplicas = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
    for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
    }
wafwerar's avatar
wafwerar 已提交
111
    int *arr = (int *)taosMemoryMalloc(sizeof(int) * pVotesGranted->replicaNum);
M
Minghao Li 已提交
112 113 114 115
    for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
      arr[i] = pVotesGranted->isGranted[i];
    }
    cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
wafwerar's avatar
wafwerar 已提交
116
    taosMemoryFree(arr);
M
Minghao Li 已提交
117 118 119
    cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);

    cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
S
Shengliang Guan 已提交
120
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesGranted->term);
M
Minghao Li 已提交
121 122 123 124 125 126 127 128
    cJSON_AddStringToObject(pRoot, "term", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
    cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
    snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);

    bool majority = voteGrantedMajority(pVotesGranted);
    cJSON_AddNumberToObject(pRoot, "majority", majority);
M
Minghao Li 已提交
129 130
  }

M
Minghao Li 已提交
131 132 133 134 135 136 137
  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
  return pJson;
}

char *voteGranted2Str(SVotesGranted *pVotesGranted) {
  cJSON *pJson = voteGranted2Json(pVotesGranted);
H
Hongze Cheng 已提交
138
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
139 140 141 142
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
143
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
wafwerar's avatar
wafwerar 已提交
144
  SVotesRespond *pVotesRespond = taosMemoryMalloc(sizeof(SVotesRespond));
M
Minghao Li 已提交
145
  ASSERT(pVotesRespond != NULL);
M
Minghao Li 已提交
146 147 148 149 150 151 152 153 154 155
  memset(pVotesRespond, 0, sizeof(SVotesRespond));

  pVotesRespond->replicas = &(pSyncNode->replicasId);
  pVotesRespond->replicaNum = pSyncNode->replicaNum;
  pVotesRespond->term = 0;
  pVotesRespond->pSyncNode = pSyncNode;

  return pVotesRespond;
}

M
Minghao Li 已提交
156 157
void votesRespondDestory(SVotesRespond *pVotesRespond) {
  if (pVotesRespond != NULL) {
wafwerar's avatar
wafwerar 已提交
158
    taosMemoryFree(pVotesRespond);
M
Minghao Li 已提交
159 160 161
  }
}

M
Minghao Li 已提交
162 163 164 165 166 167 168
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) {
  pVotesRespond->replicas = &(pSyncNode->replicasId);
  pVotesRespond->replicaNum = pSyncNode->replicaNum;
  pVotesRespond->term = 0;
  pVotesRespond->pSyncNode = pSyncNode;
}

M
Minghao Li 已提交
169 170 171 172 173 174 175 176 177 178 179 180
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
  bool ret = false;
  for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
    if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) {
      ret = true;
      break;
    }
  }
  return ret;
}

void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
M
Minghao Li 已提交
181
  if (pVotesRespond->term != pMsg->term) {
S
Shengliang Guan 已提交
182
    sNTrace(pVotesRespond->pSyncNode, "vote respond add error");
M
Minghao Li 已提交
183 184 185
    return;
  }

M
Minghao Li 已提交
186
  for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
M
Minghao Li 已提交
187
    if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
M
Minghao Li 已提交
188
      // ASSERT(pVotesRespond->isRespond[i] == false);
M
Minghao Li 已提交
189 190 191 192
      pVotesRespond->isRespond[i] = true;
      return;
    }
  }
M
Minghao Li 已提交
193
  ASSERT(0);
M
Minghao Li 已提交
194 195
}

M
Minghao Li 已提交
196
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
M
Minghao Li 已提交
197
  pVotesRespond->term = term;
M
Minghao Li 已提交
198 199 200 201 202 203 204 205 206
  memset(pVotesRespond->isRespond, 0, sizeof(pVotesRespond->isRespond));
  /*
    for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
      pVotesRespond->isRespond[i] = false;
    }
  */
}

cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
207
  char   u64buf[128] = {0};
M
Minghao Li 已提交
208 209
  cJSON *pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
210 211 212 213 214 215 216 217
  if (pVotesRespond != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
    cJSON *pReplicas = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
    for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i]));
    }
    int  respondNum = 0;
wafwerar's avatar
wafwerar 已提交
218
    int *arr = (int *)taosMemoryMalloc(sizeof(int) * pVotesRespond->replicaNum);
M
Minghao Li 已提交
219 220 221 222 223
    for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
      arr[i] = pVotesRespond->isRespond[i];
      if (pVotesRespond->isRespond[i]) {
        respondNum++;
      }
M
Minghao Li 已提交
224
    }
M
Minghao Li 已提交
225
    cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum);
wafwerar's avatar
wafwerar 已提交
226
    taosMemoryFree(arr);
M
Minghao Li 已提交
227 228 229
    cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
    cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);

S
Shengliang Guan 已提交
230
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesRespond->term);
M
Minghao Li 已提交
231 232 233
    cJSON_AddStringToObject(pRoot, "term", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
M
Minghao Li 已提交
234
  }
M
Minghao Li 已提交
235 236 237 238 239 240 241 242

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
  return pJson;
}

char *votesRespond2Str(SVotesRespond *pVotesRespond) {
  cJSON *pJson = votesRespond2Json(pVotesRespond);
H
Hongze Cheng 已提交
243
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
244 245
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
246
}