syncRaftCfg.c 13.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17 18 19
#include "syncRaftCfg.h"
#include "syncUtil.h"

20 21 22 23 24 25
// file must already exist!
SRaftCfgIndex *raftCfgIndexOpen(const char *path) {
  SRaftCfgIndex *pRaftCfgIndex = taosMemoryMalloc(sizeof(SRaftCfg));
  snprintf(pRaftCfgIndex->path, sizeof(pRaftCfgIndex->path), "%s", path);

  pRaftCfgIndex->pFile = taosOpenFile(pRaftCfgIndex->path, TD_FILE_READ | TD_FILE_WRITE);
26
  tAssert(pRaftCfgIndex->pFile != NULL);
27 28 29 30

  taosLSeekFile(pRaftCfgIndex->pFile, 0, SEEK_SET);

  int32_t bufLen = MAX_CONFIG_INDEX_COUNT * 16;
S
Shengliang Guan 已提交
31
  char   *pBuf = taosMemoryMalloc(bufLen);
32 33
  memset(pBuf, 0, bufLen);
  int64_t len = taosReadFile(pRaftCfgIndex->pFile, pBuf, bufLen);
34
  tAssert(len > 0);
35 36

  int32_t ret = raftCfgIndexFromStr(pBuf, pRaftCfgIndex);
37
  tAssert(ret == 0);
38 39 40 41 42 43 44 45 46

  taosMemoryFree(pBuf);

  return pRaftCfgIndex;
}

int32_t raftCfgIndexClose(SRaftCfgIndex *pRaftCfgIndex) {
  if (pRaftCfgIndex != NULL) {
    int64_t ret = taosCloseFile(&(pRaftCfgIndex->pFile));
47
    tAssert(ret == 0);
48 49 50 51 52 53
    taosMemoryFree(pRaftCfgIndex);
  }
  return 0;
}

int32_t raftCfgIndexPersist(SRaftCfgIndex *pRaftCfgIndex) {
54
  tAssert(pRaftCfgIndex != NULL);
55 56 57 58 59

  char *s = raftCfgIndex2Str(pRaftCfgIndex);
  taosLSeekFile(pRaftCfgIndex->pFile, 0, SEEK_SET);

  int64_t ret = taosWriteFile(pRaftCfgIndex->pFile, s, strlen(s) + 1);
60
  tAssert(ret == strlen(s) + 1);
61 62 63 64 65 66 67

  taosMemoryFree(s);
  taosFsyncFile(pRaftCfgIndex->pFile);
  return 0;
}

int32_t raftCfgIndexAddConfigIndex(SRaftCfgIndex *pRaftCfgIndex, SyncIndex configIndex) {
68
  tAssert(pRaftCfgIndex->configIndexCount <= MAX_CONFIG_INDEX_COUNT);
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
  (pRaftCfgIndex->configIndexArr)[pRaftCfgIndex->configIndexCount] = configIndex;
  ++(pRaftCfgIndex->configIndexCount);
  return 0;
}

cJSON *raftCfgIndex2Json(SRaftCfgIndex *pRaftCfgIndex) {
  cJSON *pRoot = cJSON_CreateObject();

  cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfgIndex->configIndexCount);
  cJSON *pIndexArr = cJSON_CreateArray();
  cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr);
  for (int i = 0; i < pRaftCfgIndex->configIndexCount; ++i) {
    char buf64[128];
    snprintf(buf64, sizeof(buf64), "%" PRId64, (pRaftCfgIndex->configIndexArr)[i]);
    cJSON *pIndexObj = cJSON_CreateObject();
    cJSON_AddStringToObject(pIndexObj, "index", buf64);
    cJSON_AddItemToArray(pIndexArr, pIndexObj);
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SRaftCfgIndex", pRoot);
  return pJson;
}

char *raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex) {
  cJSON *pJson = raftCfgIndex2Json(pRaftCfgIndex);
S
Shengliang Guan 已提交
95
  char  *serialized = cJSON_Print(pJson);
96 97 98 99 100 101 102 103 104 105 106 107
  cJSON_Delete(pJson);
  return serialized;
}

int32_t raftCfgIndexFromJson(const cJSON *pRoot, SRaftCfgIndex *pRaftCfgIndex) {
  cJSON *pJson = cJSON_GetObjectItem(pRoot, "SRaftCfgIndex");

  cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount");
  pRaftCfgIndex->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount);

  cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
  int    arraySize = cJSON_GetArraySize(pIndexArr);
108
  tAssert(arraySize == pRaftCfgIndex->configIndexCount);
109 110 111 112

  memset(pRaftCfgIndex->configIndexArr, 0, sizeof(pRaftCfgIndex->configIndexArr));
  for (int i = 0; i < arraySize; ++i) {
    cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
113
    tAssert(pIndexObj != NULL);
114 115

    cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
116
    tAssert(cJSON_IsString(pIndex));
117 118 119 120 121 122 123 124
    (pRaftCfgIndex->configIndexArr)[i] = atoll(pIndex->valuestring);
  }

  return 0;
}

int32_t raftCfgIndexFromStr(const char *s, SRaftCfgIndex *pRaftCfgIndex) {
  cJSON *pRoot = cJSON_Parse(s);
125
  tAssert(pRoot != NULL);
126 127

  int32_t ret = raftCfgIndexFromJson(pRoot, pRaftCfgIndex);
128
  tAssert(ret == 0);
129 130 131 132 133 134 135 136 137 138 139 140 141 142

  cJSON_Delete(pRoot);
  return 0;
}

int32_t raftCfgIndexCreateFile(const char *path) {
  TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
  if (pFile == NULL) {
    int32_t     err = terrno;
    const char *errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char *sysErrStr = strerror(errno);
    sError("create raft cfg index file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr,
           sysErrStr);
143
    tAssert(0);
144 145 146 147 148 149 150 151 152

    return -1;
  }

  SRaftCfgIndex raftCfgIndex;
  memset(raftCfgIndex.configIndexArr, 0, sizeof(raftCfgIndex.configIndexArr));
  raftCfgIndex.configIndexCount = 1;
  raftCfgIndex.configIndexArr[0] = -1;

S
Shengliang Guan 已提交
153
  char   *s = raftCfgIndex2Str(&raftCfgIndex);
154
  int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
155
  tAssert(ret == strlen(s) + 1);
156 157 158 159 160 161 162

  taosMemoryFree(s);
  taosCloseFile(&pFile);
  return 0;
}

// ---------------------------------------
M
Minghao Li 已提交
163 164 165 166 167 168
// file must already exist!
SRaftCfg *raftCfgOpen(const char *path) {
  SRaftCfg *pCfg = taosMemoryMalloc(sizeof(SRaftCfg));
  snprintf(pCfg->path, sizeof(pCfg->path), "%s", path);

  pCfg->pFile = taosOpenFile(pCfg->path, TD_FILE_READ | TD_FILE_WRITE);
169
  tAssert(pCfg->pFile != NULL);
M
Minghao Li 已提交
170 171 172

  taosLSeekFile(pCfg->pFile, 0, SEEK_SET);

173
  char buf[CONFIG_FILE_LEN] = {0};
M
Minghao Li 已提交
174
  int  len = taosReadFile(pCfg->pFile, buf, sizeof(buf));
175
  tAssert(len > 0);
M
Minghao Li 已提交
176

M
Minghao Li 已提交
177
  int32_t ret = raftCfgFromStr(buf, pCfg);
178
  tAssert(ret == 0);
M
Minghao Li 已提交
179 180 181 182 183 184

  return pCfg;
}

int32_t raftCfgClose(SRaftCfg *pRaftCfg) {
  int64_t ret = taosCloseFile(&(pRaftCfg->pFile));
185
  tAssert(ret == 0);
M
Minghao Li 已提交
186 187 188 189 190
  taosMemoryFree(pRaftCfg);
  return 0;
}

int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
191
  tAssert(pRaftCfg != NULL);
M
Minghao Li 已提交
192

M
Minghao Li 已提交
193
  char *s = raftCfg2Str(pRaftCfg);
M
Minghao Li 已提交
194 195
  taosLSeekFile(pRaftCfg->pFile, 0, SEEK_SET);

196
  char buf[CONFIG_FILE_LEN] = {0};
M
Minghao Li 已提交
197
  memset(buf, 0, sizeof(buf));
M
Minghao Li 已提交
198 199 200

  if (strlen(s) + 1 > CONFIG_FILE_LEN) {
    sError("too long config str:%s", s);
201
    tAssert(0);
M
Minghao Li 已提交
202 203
  }

M
Minghao Li 已提交
204 205
  snprintf(buf, sizeof(buf), "%s", s);
  int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf));
206
  tAssert(ret == sizeof(buf));
M
Minghao Li 已提交
207

208
  // int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
209
  // tAssert(ret == strlen(s) + 1);
M
Minghao Li 已提交
210 211

  taosMemoryFree(s);
M
Minghao Li 已提交
212 213 214 215
  taosFsyncFile(pRaftCfg->pFile);
  return 0;
}

216
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex) {
217
  tAssert(pRaftCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT);
218 219 220 221 222
  (pRaftCfg->configIndexArr)[pRaftCfg->configIndexCount] = configIndex;
  ++(pRaftCfg->configIndexCount);
  return 0;
}

M
Minghao Li 已提交
223
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
224
  char   u64buf[128] = {0};
M
Minghao Li 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
  cJSON *pRoot = cJSON_CreateObject();

  if (pSyncCfg != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncCfg->replicaNum);
    cJSON_AddNumberToObject(pRoot, "myIndex", pSyncCfg->myIndex);

    cJSON *pNodeInfoArr = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "nodeInfo", pNodeInfoArr);
    for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
      cJSON *pNodeInfo = cJSON_CreateObject();
      cJSON_AddNumberToObject(pNodeInfo, "nodePort", ((pSyncCfg->nodeInfo)[i]).nodePort);
      cJSON_AddStringToObject(pNodeInfo, "nodeFqdn", ((pSyncCfg->nodeInfo)[i]).nodeFqdn);
      cJSON_AddItemToArray(pNodeInfoArr, pNodeInfo);
    }
  }

M
Minghao Li 已提交
241
  return pRoot;
M
Minghao Li 已提交
242 243 244 245
}

int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
  memset(pSyncCfg, 0, sizeof(SSyncCfg));
M
Minghao Li 已提交
246 247
  // cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
  const cJSON *pJson = pRoot;
M
Minghao Li 已提交
248 249

  cJSON *pReplicaNum = cJSON_GetObjectItem(pJson, "replicaNum");
250
  tAssert(cJSON_IsNumber(pReplicaNum));
M
Minghao Li 已提交
251 252 253
  pSyncCfg->replicaNum = cJSON_GetNumberValue(pReplicaNum);

  cJSON *pMyIndex = cJSON_GetObjectItem(pJson, "myIndex");
254
  tAssert(cJSON_IsNumber(pMyIndex));
M
Minghao Li 已提交
255 256 257 258
  pSyncCfg->myIndex = cJSON_GetNumberValue(pMyIndex);

  cJSON *pNodeInfoArr = cJSON_GetObjectItem(pJson, "nodeInfo");
  int    arraySize = cJSON_GetArraySize(pNodeInfoArr);
259
  tAssert(arraySize == pSyncCfg->replicaNum);
M
Minghao Li 已提交
260 261 262

  for (int i = 0; i < arraySize; ++i) {
    cJSON *pNodeInfo = cJSON_GetArrayItem(pNodeInfoArr, i);
263
    tAssert(pNodeInfo != NULL);
M
Minghao Li 已提交
264 265

    cJSON *pNodePort = cJSON_GetObjectItem(pNodeInfo, "nodePort");
266
    tAssert(cJSON_IsNumber(pNodePort));
M
Minghao Li 已提交
267 268 269
    ((pSyncCfg->nodeInfo)[i]).nodePort = cJSON_GetNumberValue(pNodePort);

    cJSON *pNodeFqdn = cJSON_GetObjectItem(pNodeInfo, "nodeFqdn");
270
    tAssert(cJSON_IsString(pNodeFqdn));
M
Minghao Li 已提交
271 272 273 274 275 276 277 278
    snprintf(((pSyncCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pSyncCfg->nodeInfo)[i]).nodeFqdn), "%s",
             pNodeFqdn->valuestring);
  }

  return 0;
}

cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
M
Minghao Li 已提交
279 280 281
  cJSON *pRoot = cJSON_CreateObject();
  cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
  cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
M
Minghao Li 已提交
282
  cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy);
M
Minghao Li 已提交
283
  cJSON_AddNumberToObject(pRoot, "batchSize", pRaftCfg->batchSize);
M
Minghao Li 已提交
284

285
  char buf64[128];
S
Shengliang Guan 已提交
286
  snprintf(buf64, sizeof(buf64), "%" PRId64, pRaftCfg->lastConfigIndex);
287 288
  cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);

289 290 291 292
  cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount);
  cJSON *pIndexArr = cJSON_CreateArray();
  cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr);
  for (int i = 0; i < pRaftCfg->configIndexCount; ++i) {
S
Shengliang Guan 已提交
293
    snprintf(buf64, sizeof(buf64), "%" PRId64, (pRaftCfg->configIndexArr)[i]);
294 295 296 297 298
    cJSON *pIndexObj = cJSON_CreateObject();
    cJSON_AddStringToObject(pIndexObj, "index", buf64);
    cJSON_AddItemToArray(pIndexArr, pIndexObj);
  }

M
Minghao Li 已提交
299 300
  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
M
Minghao Li 已提交
301 302 303 304
  return pJson;
}

char *raftCfg2Str(SRaftCfg *pRaftCfg) {
M
Minghao Li 已提交
305
  cJSON *pJson = raftCfg2Json(pRaftCfg);
S
Shengliang Guan 已提交
306
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
307 308
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
309 310
}

M
Minghao Li 已提交
311
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
M
Minghao Li 已提交
312
  TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
M
Minghao Li 已提交
313 314 315 316 317 318 319 320
  if (pFile == NULL) {
    int32_t     err = terrno;
    const char *errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char *sysErrStr = strerror(errno);
    sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr);
    return -1;
  }
M
Minghao Li 已提交
321

M
Minghao Li 已提交
322 323
  SRaftCfg raftCfg;
  raftCfg.cfg = *pCfg;
M
Minghao Li 已提交
324
  raftCfg.isStandBy = meta.isStandBy;
M
Minghao Li 已提交
325
  raftCfg.batchSize = meta.batchSize;
M
Minghao Li 已提交
326
  raftCfg.snapshotStrategy = meta.snapshotStrategy;
327
  raftCfg.lastConfigIndex = meta.lastConfigIndex;
328 329 330
  raftCfg.configIndexCount = 1;
  memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
  raftCfg.configIndexArr[0] = -1;
331
  char *s = raftCfg2Str(&raftCfg);
M
Minghao Li 已提交
332

333
  char buf[CONFIG_FILE_LEN] = {0};
M
Minghao Li 已提交
334
  memset(buf, 0, sizeof(buf));
335
  tAssert(strlen(s) + 1 <= CONFIG_FILE_LEN);
M
Minghao Li 已提交
336 337
  snprintf(buf, sizeof(buf), "%s", s);
  int64_t ret = taosWriteFile(pFile, buf, sizeof(buf));
338
  tAssert(ret == sizeof(buf));
M
Minghao Li 已提交
339

340
  // int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
341
  // tAssert(ret == strlen(s) + 1);
M
Minghao Li 已提交
342 343 344 345 346 347

  taosMemoryFree(s);
  taosCloseFile(&pFile);
  return 0;
}

M
Minghao Li 已提交
348 349 350 351 352 353 354
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
  // memset(pRaftCfg, 0, sizeof(SRaftCfg));
  cJSON *pJson = cJSON_GetObjectItem(pRoot, "RaftCfg");

  cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
  pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);

M
Minghao Li 已提交
355 356 357
  cJSON *pJsonBatchSize = cJSON_GetObjectItem(pJson, "batchSize");
  pRaftCfg->batchSize = cJSON_GetNumberValue(pJsonBatchSize);

M
Minghao Li 已提交
358 359
  cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy");
  pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy);
M
Minghao Li 已提交
360

361 362 363
  cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
  pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));

364 365 366 367 368
  cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount");
  pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount);

  cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
  int    arraySize = cJSON_GetArraySize(pIndexArr);
369
  tAssert(arraySize == pRaftCfg->configIndexCount);
370 371 372 373

  memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr));
  for (int i = 0; i < arraySize; ++i) {
    cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
374
    tAssert(pIndexObj != NULL);
375 376

    cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
377
    tAssert(cJSON_IsString(pIndex));
378 379 380
    (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
  }

S
Shengliang Guan 已提交
381
  cJSON  *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
M
Minghao Li 已提交
382
  int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
383
  tAssert(code == 0);
M
Minghao Li 已提交
384 385 386 387 388 389

  return code;
}

int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg) {
  cJSON *pRoot = cJSON_Parse(s);
390
  tAssert(pRoot != NULL);
M
Minghao Li 已提交
391 392

  int32_t ret = raftCfgFromJson(pRoot, pRaftCfg);
393
  tAssert(ret == 0);
M
Minghao Li 已提交
394 395 396 397

  cJSON_Delete(pRoot);
  return 0;
}