syncRaftCfg.c 8.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17 18
#include "syncRaftCfg.h"
#include "syncUtil.h"
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
#include "tjson.h"

static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
  SSyncCfg *pCfg = (SSyncCfg *)pObj;
  if (tjsonAddIntegerToObject(pJson, "replicaNum", pCfg->replicaNum) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "myIndex", pCfg->myIndex) < 0) return -1;

  SJson *nodeInfo = tjsonCreateArray();
  if (nodeInfo == NULL) return -1;
  if (tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo) < 0) return -1;
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SJson *info = tjsonCreateObject();
    if (info == NULL) return -1;
    if (tjsonAddIntegerToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort) < 0) return -1;
    if (tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn) < 0) return -1;
    if (tjsonAddIntegerToObject(info, "nodeId", pCfg->nodeInfo[i].nodeId) < 0) return -1;
    if (tjsonAddIntegerToObject(info, "clusterId", pCfg->nodeInfo[i].clusterId) < 0) return -1;
    if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1;
37 38 39 40 41
  }

  return 0;
}

42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
  SRaftCfg *pCfg = (SRaftCfg *)pObj;
  if (tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "isStandBy", pCfg->isStandBy) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "batchSize", pCfg->batchSize) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "lastConfigIndex", pCfg->lastConfigIndex) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "configIndexCount", pCfg->configIndexCount) < 0) return -1;

  SJson *configIndexArr = tjsonCreateArray();
  if (configIndexArr == NULL) return -1;
  if (tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr) < 0) return -1;
  for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
    SJson *configIndex = tjsonCreateObject();
    if (configIndex == NULL) return -1;
    if (tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]) < 0) return -1;
    if (tjsonAddItemToArray(configIndexArr, configIndex) < 0) return -1;
59 60 61 62 63
  }

  return 0;
}

64 65 66 67 68 69 70 71 72
int32_t syncWriteCfgFile(SSyncNode *pNode) {
  int32_t     code = -1;
  char       *buffer = NULL;
  SJson      *pJson = NULL;
  TdFilePtr   pFile = NULL;
  const char *realfile = pNode->configPath;
  SRaftCfg   *pCfg = &pNode->raftCfg;
  char        file[PATH_MAX] = {0};
  snprintf(file, sizeof(file), "%s.bak", realfile);
73

74
  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
75
  if (pFile == NULL) {
76 77 78
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("failed to open sync cfg file:%s since %s", pNode->vgId, realfile, terrstr());
    goto _OVER;
79 80
  }

81 82 83 84
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  pJson = tjsonCreateObject();
  if (pJson == NULL) goto _OVER;
  if (tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg) < 0) goto _OVER;
85

86 87
  buffer = tjsonToString(pJson);
  if (buffer == NULL) goto _OVER;
88

89 90 91
  int32_t len = strlen(buffer);
  if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
  if (taosFsyncFile(pFile) < 0) goto _OVER;
92
  taosCloseFile(&pFile);
M
Minghao Li 已提交
93

94 95 96 97
  if (taosRenameFile(file, realfile) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to rename sync cfg file:%s to %s since %s", pNode->vgId, file, realfile, terrstr());
    goto _OVER;
M
Minghao Li 已提交
98 99
  }

100 101
  code = 0;
  sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d", pNode->vgId, realfile, len);
M
Minghao Li 已提交
102

103 104 105 106
_OVER:
  if (pJson != NULL) tjsonDelete(pJson);
  if (buffer != NULL) taosMemoryFree(buffer);
  if (pFile != NULL) taosCloseFile(&pFile);
M
Minghao Li 已提交
107

108 109 110 111
  if (code != 0) {
    sError("failed to write sync cfg file:%s since %s", pNode->vgId, realfile, terrstr());
  }
  return code;
112 113
}

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) {
  SSyncCfg *pCfg = (SSyncCfg *)pObj;
  int32_t   code = 0;

  tjsonGetNumberValue(pJson, "replicaNum", pCfg->replicaNum, code);
  if (code < 0) return -1;
  tjsonGetNumberValue(pJson, "myIndex", pCfg->myIndex, code);
  if (code < 0) return -1;

  SJson *nodeInfo = tjsonGetObjectItem(pJson, "nodeInfo");
  if (nodeInfo == NULL) return -1;
  pCfg->replicaNum = tjsonGetArraySize(nodeInfo);

  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SJson *info = tjsonGetArrayItem(nodeInfo, i);
    if (info == NULL) return -1;
    tjsonGetNumberValue(info, "nodePort", pCfg->nodeInfo[i].nodePort, code);
    if (code < 0) return -1;
    tjsonGetStringValue(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn);
    if (code < 0) return -1;
    tjsonGetNumberValue(info, "nodeId", pCfg->nodeInfo[i].nodeId, code);
    tjsonGetNumberValue(info, "clusterId", pCfg->nodeInfo[i].clusterId, code);
M
Minghao Li 已提交
136 137
  }

138
  return 0;
M
Minghao Li 已提交
139 140
}

141 142 143
static int32_t syncDecodeRaftCfg(const SJson *pJson, void *pObj) {
  SRaftCfg *pCfg = (SRaftCfg *)pObj;
  int32_t   code = 0;
M
Minghao Li 已提交
144

145
  if (tjsonToObject(pJson, "SSyncCfg", syncDecodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1;
M
Minghao Li 已提交
146

147 148 149 150 151 152 153 154 155
  tjsonGetNumberValue(pJson, "isStandBy", pCfg->isStandBy, code);
  if (code < 0) return -1;
  tjsonGetNumberValue(pJson, "snapshotStrategy", pCfg->snapshotStrategy, code);
  if (code < 0) return -1;
  tjsonGetNumberValue(pJson, "batchSize", pCfg->batchSize, code);
  if (code < 0) return -1;
  tjsonGetNumberValue(pJson, "lastConfigIndex", pCfg->lastConfigIndex, code);
  if (code < 0) return -1;
  tjsonGetNumberValue(pJson, "configIndexCount", pCfg->configIndexCount, code);
M
Minghao Li 已提交
156

157 158
  SJson *configIndexArr = tjsonGetObjectItem(pJson, "configIndexArr");
  if (configIndexArr == NULL) return -1;
M
Minghao Li 已提交
159

160 161 162 163 164 165
  pCfg->configIndexCount = tjsonGetArraySize(configIndexArr);
  for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
    SJson *configIndex = tjsonGetArrayItem(configIndexArr, i);
    if (configIndex == NULL) return -1;
    tjsonGetNumberValue(configIndex, "index", pCfg->configIndexArr[i], code);
    if (code < 0) return -1;
M
Minghao Li 已提交
166 167 168 169 170
  }

  return 0;
}

171 172 173 174 175 176 177
int32_t syncReadCfgFile(SSyncNode *pNode) {
  int32_t     code = -1;
  TdFilePtr   pFile = NULL;
  char       *pData = NULL;
  SJson      *pJson = NULL;
  const char *file = pNode->configPath;
  SRaftCfg   *pCfg = &pNode->raftCfg;
178

179
  pFile = taosOpenFile(file, TD_FILE_READ);
M
Minghao Li 已提交
180
  if (pFile == NULL) {
181 182 183
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
M
Minghao Li 已提交
184
  }
M
Minghao Li 已提交
185

186 187 188 189 190 191
  int64_t size = 0;
  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to fstat sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
192

193 194 195 196 197
  pData = taosMemoryMalloc(size + 1);
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }
M
Minghao Li 已提交
198

199 200 201 202 203
  if (taosReadFile(pFile, pData, size) != size) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
204

205
  pData[size] = '\0';
M
Minghao Li 已提交
206

207 208 209 210 211
  pJson = tjsonParse(pData);
  if (pJson == NULL) {
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
    goto _OVER;
  }
212

213 214 215 216
  if (tjsonToObject(pJson, "RaftCfg", syncDecodeRaftCfg, (void *)pCfg) < 0) {
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
    goto _OVER;
  }
217

218 219
  code = 0;
  sInfo("vgId:%d, succceed to read sync cfg file %s", pNode->vgId, file);
220

221 222 223 224
_OVER:
  if (pData != NULL) taosMemoryFree(pData);
  if (pJson != NULL) cJSON_Delete(pJson);
  if (pFile != NULL) taosCloseFile(&pFile);
225

226 227
  if (code != 0) {
    sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr());
228
  }
M
Minghao Li 已提交
229 230 231
  return code;
}

232 233 234 235 236
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
  SRaftCfg *pCfg = &pNode->raftCfg;
  if (pCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT) {
    return -1;
  }
M
Minghao Li 已提交
237

238 239
  pCfg->configIndexArr[pCfg->configIndexCount] = cfgIndex;
  pCfg->configIndexCount++;
M
Minghao Li 已提交
240
  return 0;
241
}