syncRaftCfg.c 11.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftCfg.h"
#include "cJSON.h"
#include "syncEnv.h"
#include "syncUtil.h"

// file must already exist!
SRaftCfg *raftCfgOpen(const char *path) {
  SRaftCfg *pCfg = taosMemoryMalloc(sizeof(SRaftCfg));
  snprintf(pCfg->path, sizeof(pCfg->path), "%s", path);

  pCfg->pFile = taosOpenFile(pCfg->path, TD_FILE_READ | TD_FILE_WRITE);
M
Minghao Li 已提交
27
  ASSERT(pCfg->pFile != NULL);
M
Minghao Li 已提交
28 29 30

  taosLSeekFile(pCfg->pFile, 0, SEEK_SET);

31
  char buf[1024] = {0};
M
Minghao Li 已提交
32
  int  len = taosReadFile(pCfg->pFile, buf, sizeof(buf));
M
Minghao Li 已提交
33
  ASSERT(len > 0);
M
Minghao Li 已提交
34

M
Minghao Li 已提交
35
  int32_t ret = raftCfgFromStr(buf, pCfg);
M
Minghao Li 已提交
36
  ASSERT(ret == 0);
M
Minghao Li 已提交
37 38 39 40 41 42

  return pCfg;
}

int32_t raftCfgClose(SRaftCfg *pRaftCfg) {
  int64_t ret = taosCloseFile(&(pRaftCfg->pFile));
M
Minghao Li 已提交
43
  ASSERT(ret == 0);
M
Minghao Li 已提交
44 45 46 47 48
  taosMemoryFree(pRaftCfg);
  return 0;
}

int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
M
Minghao Li 已提交
49
  ASSERT(pRaftCfg != NULL);
M
Minghao Li 已提交
50

M
Minghao Li 已提交
51
  char *s = raftCfg2Str(pRaftCfg);
M
Minghao Li 已提交
52 53
  taosLSeekFile(pRaftCfg->pFile, 0, SEEK_SET);

54
  char buf[CONFIG_FILE_LEN] = {0};
M
Minghao Li 已提交
55
  memset(buf, 0, sizeof(buf));
M
Minghao Li 已提交
56 57 58 59 60 61

  if (strlen(s) + 1 > CONFIG_FILE_LEN) {
    sError("too long config str:%s", s);
    ASSERT(0);
  }

M
Minghao Li 已提交
62 63
  snprintf(buf, sizeof(buf), "%s", s);
  int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf));
M
Minghao Li 已提交
64
  ASSERT(ret == sizeof(buf));
M
Minghao Li 已提交
65

66
  // int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
M
Minghao Li 已提交
67
  // ASSERT(ret == strlen(s) + 1);
M
Minghao Li 已提交
68 69

  taosMemoryFree(s);
M
Minghao Li 已提交
70 71 72 73
  taosFsyncFile(pRaftCfg->pFile);
  return 0;
}

74 75 76 77 78 79 80
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex) {
  ASSERT(pRaftCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT);
  (pRaftCfg->configIndexArr)[pRaftCfg->configIndexCount] = configIndex;
  ++(pRaftCfg->configIndexCount);
  return 0;
}

M
Minghao Li 已提交
81
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
82
  char   u64buf[128] = {0};
M
Minghao Li 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
  cJSON *pRoot = cJSON_CreateObject();

  if (pSyncCfg != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncCfg->replicaNum);
    cJSON_AddNumberToObject(pRoot, "myIndex", pSyncCfg->myIndex);

    cJSON *pNodeInfoArr = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "nodeInfo", pNodeInfoArr);
    for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
      cJSON *pNodeInfo = cJSON_CreateObject();
      cJSON_AddNumberToObject(pNodeInfo, "nodePort", ((pSyncCfg->nodeInfo)[i]).nodePort);
      cJSON_AddStringToObject(pNodeInfo, "nodeFqdn", ((pSyncCfg->nodeInfo)[i]).nodeFqdn);
      cJSON_AddItemToArray(pNodeInfoArr, pNodeInfo);
    }
  }

M
Minghao Li 已提交
99
  return pRoot;
M
Minghao Li 已提交
100 101 102 103
}

char *syncCfg2Str(SSyncCfg *pSyncCfg) {
  cJSON *pJson = syncCfg2Json(pSyncCfg);
104
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
105 106 107 108
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
109
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
110 111
  if (pSyncCfg != NULL) {
    int32_t len = 512;
112
    char *  s = taosMemoryMalloc(len);
113 114 115 116 117 118 119 120 121 122 123 124 125 126
    memset(s, 0, len);

    snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
    char *p = s + strlen(s);
    for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
      /*
      if (p + 128 + 32 > s + len) {
        break;
      }
      */
      char buf[128 + 32];
      snprintf(buf, sizeof(buf), "%s:%d, ", pSyncCfg->nodeInfo[i].nodeFqdn, pSyncCfg->nodeInfo[i].nodePort);
      strncpy(p, buf, sizeof(buf));
      p = s + strlen(s);
M
Minghao Li 已提交
127
    }
128 129 130
    strcpy(p - 2, "}");

    return s;
M
Minghao Li 已提交
131 132
  }

133
  return NULL;
M
Minghao Li 已提交
134 135
}

M
Minghao Li 已提交
136 137
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
  memset(pSyncCfg, 0, sizeof(SSyncCfg));
M
Minghao Li 已提交
138 139
  // cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
  const cJSON *pJson = pRoot;
M
Minghao Li 已提交
140 141

  cJSON *pReplicaNum = cJSON_GetObjectItem(pJson, "replicaNum");
M
Minghao Li 已提交
142
  ASSERT(cJSON_IsNumber(pReplicaNum));
M
Minghao Li 已提交
143 144 145
  pSyncCfg->replicaNum = cJSON_GetNumberValue(pReplicaNum);

  cJSON *pMyIndex = cJSON_GetObjectItem(pJson, "myIndex");
M
Minghao Li 已提交
146
  ASSERT(cJSON_IsNumber(pMyIndex));
M
Minghao Li 已提交
147 148 149 150
  pSyncCfg->myIndex = cJSON_GetNumberValue(pMyIndex);

  cJSON *pNodeInfoArr = cJSON_GetObjectItem(pJson, "nodeInfo");
  int    arraySize = cJSON_GetArraySize(pNodeInfoArr);
M
Minghao Li 已提交
151
  ASSERT(arraySize == pSyncCfg->replicaNum);
M
Minghao Li 已提交
152 153 154

  for (int i = 0; i < arraySize; ++i) {
    cJSON *pNodeInfo = cJSON_GetArrayItem(pNodeInfoArr, i);
M
Minghao Li 已提交
155
    ASSERT(pNodeInfo != NULL);
M
Minghao Li 已提交
156 157

    cJSON *pNodePort = cJSON_GetObjectItem(pNodeInfo, "nodePort");
M
Minghao Li 已提交
158
    ASSERT(cJSON_IsNumber(pNodePort));
M
Minghao Li 已提交
159 160 161
    ((pSyncCfg->nodeInfo)[i]).nodePort = cJSON_GetNumberValue(pNodePort);

    cJSON *pNodeFqdn = cJSON_GetObjectItem(pNodeInfo, "nodeFqdn");
M
Minghao Li 已提交
162
    ASSERT(cJSON_IsString(pNodeFqdn));
M
Minghao Li 已提交
163 164 165 166 167 168 169 170 171
    snprintf(((pSyncCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pSyncCfg->nodeInfo)[i]).nodeFqdn), "%s",
             pNodeFqdn->valuestring);
  }

  return 0;
}

int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) {
  cJSON *pRoot = cJSON_Parse(s);
M
Minghao Li 已提交
172
  ASSERT(pRoot != NULL);
M
Minghao Li 已提交
173 174

  int32_t ret = syncCfgFromJson(pRoot, pSyncCfg);
M
Minghao Li 已提交
175
  ASSERT(ret == 0);
M
Minghao Li 已提交
176 177 178 179 180 181

  cJSON_Delete(pRoot);
  return 0;
}

cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
M
Minghao Li 已提交
182 183 184
  cJSON *pRoot = cJSON_CreateObject();
  cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
  cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
M
Minghao Li 已提交
185
  cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy);
M
Minghao Li 已提交
186
  cJSON_AddNumberToObject(pRoot, "batchSize", pRaftCfg->batchSize);
M
Minghao Li 已提交
187

188 189 190 191
  char buf64[128];
  snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
  cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);

192 193 194 195 196 197 198 199 200 201
  cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount);
  cJSON *pIndexArr = cJSON_CreateArray();
  cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr);
  for (int i = 0; i < pRaftCfg->configIndexCount; ++i) {
    snprintf(buf64, sizeof(buf64), "%ld", (pRaftCfg->configIndexArr)[i]);
    cJSON *pIndexObj = cJSON_CreateObject();
    cJSON_AddStringToObject(pIndexObj, "index", buf64);
    cJSON_AddItemToArray(pIndexArr, pIndexObj);
  }

M
Minghao Li 已提交
202 203
  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
M
Minghao Li 已提交
204 205 206 207
  return pJson;
}

char *raftCfg2Str(SRaftCfg *pRaftCfg) {
M
Minghao Li 已提交
208
  cJSON *pJson = raftCfg2Json(pRaftCfg);
209
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
210 211
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
212 213
}

M
Minghao Li 已提交
214
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
M
Minghao Li 已提交
215
  ASSERT(pCfg != NULL);
M
Minghao Li 已提交
216 217

  TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
M
Minghao Li 已提交
218 219 220 221 222 223 224 225 226 227
  if (pFile == NULL) {
    int32_t     err = terrno;
    const char *errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char *sysErrStr = strerror(errno);
    sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr);
    ASSERT(0);

    return -1;
  }
M
Minghao Li 已提交
228

M
Minghao Li 已提交
229 230
  SRaftCfg raftCfg;
  raftCfg.cfg = *pCfg;
M
Minghao Li 已提交
231
  raftCfg.isStandBy = meta.isStandBy;
M
Minghao Li 已提交
232
  raftCfg.batchSize = meta.batchSize;
M
Minghao Li 已提交
233
  raftCfg.snapshotStrategy = meta.snapshotStrategy;
234
  raftCfg.lastConfigIndex = meta.lastConfigIndex;
235 236 237
  raftCfg.configIndexCount = 1;
  memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
  raftCfg.configIndexArr[0] = -1;
238
  char *s = raftCfg2Str(&raftCfg);
M
Minghao Li 已提交
239

240
  char buf[CONFIG_FILE_LEN] = {0};
M
Minghao Li 已提交
241 242 243 244
  memset(buf, 0, sizeof(buf));
  ASSERT(strlen(s) + 1 <= CONFIG_FILE_LEN);
  snprintf(buf, sizeof(buf), "%s", s);
  int64_t ret = taosWriteFile(pFile, buf, sizeof(buf));
M
Minghao Li 已提交
245
  ASSERT(ret == sizeof(buf));
M
Minghao Li 已提交
246

247
  // int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
M
Minghao Li 已提交
248
  // ASSERT(ret == strlen(s) + 1);
M
Minghao Li 已提交
249 250 251 252 253 254

  taosMemoryFree(s);
  taosCloseFile(&pFile);
  return 0;
}

M
Minghao Li 已提交
255 256 257 258 259 260 261
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
  // memset(pRaftCfg, 0, sizeof(SRaftCfg));
  cJSON *pJson = cJSON_GetObjectItem(pRoot, "RaftCfg");

  cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
  pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);

M
Minghao Li 已提交
262 263 264
  cJSON *pJsonBatchSize = cJSON_GetObjectItem(pJson, "batchSize");
  pRaftCfg->batchSize = cJSON_GetNumberValue(pJsonBatchSize);

M
Minghao Li 已提交
265 266
  cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy");
  pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy);
M
Minghao Li 已提交
267

268 269 270
  cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
  pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));

271 272 273 274 275
  cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount");
  pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount);

  cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
  int    arraySize = cJSON_GetArraySize(pIndexArr);
M
Minghao Li 已提交
276
  ASSERT(arraySize == pRaftCfg->configIndexCount);
277 278 279 280

  memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr));
  for (int i = 0; i < arraySize; ++i) {
    cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
M
Minghao Li 已提交
281
    ASSERT(pIndexObj != NULL);
282 283

    cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
M
Minghao Li 已提交
284
    ASSERT(cJSON_IsString(pIndex));
285 286 287
    (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
  }

288
  cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
M
Minghao Li 已提交
289 290 291 292 293 294 295 296
  int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
  ASSERT(code == 0);

  return code;
}

int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg) {
  cJSON *pRoot = cJSON_Parse(s);
M
Minghao Li 已提交
297
  ASSERT(pRoot != NULL);
M
Minghao Li 已提交
298 299

  int32_t ret = raftCfgFromJson(pRoot, pRaftCfg);
M
Minghao Li 已提交
300
  ASSERT(ret == 0);
M
Minghao Li 已提交
301 302 303 304 305

  cJSON_Delete(pRoot);
  return 0;
}

M
Minghao Li 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
// for debug ----------------------
void syncCfgPrint(SSyncCfg *pCfg) {
  char *serialized = syncCfg2Str(pCfg);
  printf("syncCfgPrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
  taosMemoryFree(serialized);
}

void syncCfgPrint2(char *s, SSyncCfg *pCfg) {
  char *serialized = syncCfg2Str(pCfg);
  printf("syncCfgPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
  taosMemoryFree(serialized);
}

void syncCfgLog(SSyncCfg *pCfg) {
  char *serialized = syncCfg2Str(pCfg);
  sTrace("syncCfgLog | len:%lu | %s", strlen(serialized), serialized);
  taosMemoryFree(serialized);
}

void syncCfgLog2(char *s, SSyncCfg *pCfg) {
  char *serialized = syncCfg2Str(pCfg);
  sTrace("syncCfgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
  taosMemoryFree(serialized);
}

M
Minghao Li 已提交
333 334 335 336 337 338
void syncCfgLog3(char *s, SSyncCfg *pCfg) {
  char *serialized = syncCfg2SimpleStr(pCfg);
  sTrace("syncCfgLog3 | len:%lu | %s | %s", strlen(serialized), s, serialized);
  taosMemoryFree(serialized);
}

M
Minghao Li 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
void raftCfgPrint(SRaftCfg *pCfg) {
  char *serialized = raftCfg2Str(pCfg);
  printf("raftCfgPrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
  taosMemoryFree(serialized);
}

void raftCfgPrint2(char *s, SRaftCfg *pCfg) {
  char *serialized = raftCfg2Str(pCfg);
  printf("raftCfgPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
  taosMemoryFree(serialized);
}

void raftCfgLog(SRaftCfg *pCfg) {
  char *serialized = raftCfg2Str(pCfg);
  sTrace("raftCfgLog | len:%lu | %s", strlen(serialized), serialized);
  taosMemoryFree(serialized);
}

void raftCfgLog2(char *s, SRaftCfg *pCfg) {
  char *serialized = raftCfg2Str(pCfg);
  sTrace("raftCfgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
  taosMemoryFree(serialized);
}