syncRaftCfg.c 8.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17 18
#include "syncRaftCfg.h"
#include "syncUtil.h"
19 20 21 22
#include "tjson.h"

static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
  SSyncCfg *pCfg = (SSyncCfg *)pObj;
S
Shengliang Guan 已提交
23 24
  if (tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum) < 0) return -1;
  if (tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex) < 0) return -1;
25 26 27 28 29 30 31

  SJson *nodeInfo = tjsonCreateArray();
  if (nodeInfo == NULL) return -1;
  if (tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo) < 0) return -1;
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SJson *info = tjsonCreateObject();
    if (info == NULL) return -1;
S
Shengliang Guan 已提交
32
    if (tjsonAddDoubleToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort) < 0) return -1;
33 34 35 36
    if (tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn) < 0) return -1;
    if (tjsonAddIntegerToObject(info, "nodeId", pCfg->nodeInfo[i].nodeId) < 0) return -1;
    if (tjsonAddIntegerToObject(info, "clusterId", pCfg->nodeInfo[i].clusterId) < 0) return -1;
    if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1;
37 38 39 40 41
  }

  return 0;
}

42 43 44
static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
  SRaftCfg *pCfg = (SRaftCfg *)pObj;
  if (tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1;
S
Shengliang Guan 已提交
45 46
  if (tjsonAddDoubleToObject(pJson, "isStandBy", pCfg->isStandBy) < 0) return -1;
  if (tjsonAddDoubleToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy) < 0) return -1;
S
Shengliang Guan 已提交
47 48
  if (tjsonAddDoubleToObject(pJson, "batchSize", pCfg->batchSize) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "lastConfigIndex", pCfg->lastConfigIndex) < 0) return -1;
S
Shengliang Guan 已提交
49
  if (tjsonAddDoubleToObject(pJson, "configIndexCount", pCfg->configIndexCount) < 0) return -1;
50 51 52 53 54 55 56 57 58

  SJson *configIndexArr = tjsonCreateArray();
  if (configIndexArr == NULL) return -1;
  if (tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr) < 0) return -1;
  for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
    SJson *configIndex = tjsonCreateObject();
    if (configIndex == NULL) return -1;
    if (tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]) < 0) return -1;
    if (tjsonAddItemToArray(configIndexArr, configIndex) < 0) return -1;
59 60 61 62 63
  }

  return 0;
}

64 65 66 67 68 69 70 71 72
int32_t syncWriteCfgFile(SSyncNode *pNode) {
  int32_t     code = -1;
  char       *buffer = NULL;
  SJson      *pJson = NULL;
  TdFilePtr   pFile = NULL;
  const char *realfile = pNode->configPath;
  SRaftCfg   *pCfg = &pNode->raftCfg;
  char        file[PATH_MAX] = {0};
  snprintf(file, sizeof(file), "%s.bak", realfile);
73

74 75 76 77 78 79
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  pJson = tjsonCreateObject();
  if (pJson == NULL) goto _OVER;
  if (tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg) < 0) goto _OVER;
  buffer = tjsonToString(pJson);
  if (buffer == NULL) goto _OVER;
S
Shengliang Guan 已提交
80 81 82 83
  terrno = 0;

  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) goto _OVER;
84

85 86 87
  int32_t len = strlen(buffer);
  if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
  if (taosFsyncFile(pFile) < 0) goto _OVER;
M
Minghao Li 已提交
88

S
Shengliang Guan 已提交
89
  taosCloseFile(&pFile);
90
  if (taosRenameFile(file, realfile) != 0) goto _OVER;
M
Minghao Li 已提交
91

92 93
  code = 0;
  sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d", pNode->vgId, realfile, len);
M
Minghao Li 已提交
94

95 96 97 98
_OVER:
  if (pJson != NULL) tjsonDelete(pJson);
  if (buffer != NULL) taosMemoryFree(buffer);
  if (pFile != NULL) taosCloseFile(&pFile);
M
Minghao Li 已提交
99

100
  if (code != 0) {
S
Shengliang Guan 已提交
101
    if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
102
    sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, terrstr());
103 104
  }
  return code;
105 106
}

107 108 109 110
static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) {
  SSyncCfg *pCfg = (SSyncCfg *)pObj;
  int32_t   code = 0;

S
Shengliang Guan 已提交
111
  tjsonGetInt32ValueFromDouble(pJson, "replicaNum", pCfg->replicaNum, code);
112
  if (code < 0) return -1;
S
Shengliang Guan 已提交
113
  tjsonGetInt32ValueFromDouble(pJson, "myIndex", pCfg->myIndex, code);
114 115 116 117 118 119 120 121 122
  if (code < 0) return -1;

  SJson *nodeInfo = tjsonGetObjectItem(pJson, "nodeInfo");
  if (nodeInfo == NULL) return -1;
  pCfg->replicaNum = tjsonGetArraySize(nodeInfo);

  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SJson *info = tjsonGetArrayItem(nodeInfo, i);
    if (info == NULL) return -1;
S
Shengliang Guan 已提交
123
    tjsonGetUInt16ValueFromDouble(info, "nodePort", pCfg->nodeInfo[i].nodePort, code);
124
    if (code < 0) return -1;
S
Shengliang Guan 已提交
125
    code = tjsonGetStringValue(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn);
126 127 128
    if (code < 0) return -1;
    tjsonGetNumberValue(info, "nodeId", pCfg->nodeInfo[i].nodeId, code);
    tjsonGetNumberValue(info, "clusterId", pCfg->nodeInfo[i].clusterId, code);
M
Minghao Li 已提交
129 130
  }

131
  return 0;
M
Minghao Li 已提交
132 133
}

134 135 136
static int32_t syncDecodeRaftCfg(const SJson *pJson, void *pObj) {
  SRaftCfg *pCfg = (SRaftCfg *)pObj;
  int32_t   code = 0;
M
Minghao Li 已提交
137

138
  if (tjsonToObject(pJson, "SSyncCfg", syncDecodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1;
M
Minghao Li 已提交
139

S
Shengliang Guan 已提交
140
  tjsonGetInt8ValueFromDouble(pJson, "isStandBy", pCfg->isStandBy, code);
141
  if (code < 0) return -1;
S
Shengliang Guan 已提交
142
  tjsonGetInt8ValueFromDouble(pJson, "snapshotStrategy", pCfg->snapshotStrategy, code);
143
  if (code < 0) return -1;
S
Shengliang Guan 已提交
144
  tjsonGetInt32ValueFromDouble(pJson, "batchSize", pCfg->batchSize, code);
145 146 147
  if (code < 0) return -1;
  tjsonGetNumberValue(pJson, "lastConfigIndex", pCfg->lastConfigIndex, code);
  if (code < 0) return -1;
S
Shengliang Guan 已提交
148
  tjsonGetInt32ValueFromDouble(pJson, "configIndexCount", pCfg->configIndexCount, code);
M
Minghao Li 已提交
149

150 151
  SJson *configIndexArr = tjsonGetObjectItem(pJson, "configIndexArr");
  if (configIndexArr == NULL) return -1;
M
Minghao Li 已提交
152

153 154 155 156 157 158
  pCfg->configIndexCount = tjsonGetArraySize(configIndexArr);
  for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
    SJson *configIndex = tjsonGetArrayItem(configIndexArr, i);
    if (configIndex == NULL) return -1;
    tjsonGetNumberValue(configIndex, "index", pCfg->configIndexArr[i], code);
    if (code < 0) return -1;
M
Minghao Li 已提交
159 160 161 162 163
  }

  return 0;
}

164 165 166 167 168 169 170
int32_t syncReadCfgFile(SSyncNode *pNode) {
  int32_t     code = -1;
  TdFilePtr   pFile = NULL;
  char       *pData = NULL;
  SJson      *pJson = NULL;
  const char *file = pNode->configPath;
  SRaftCfg   *pCfg = &pNode->raftCfg;
171

172
  pFile = taosOpenFile(file, TD_FILE_READ);
M
Minghao Li 已提交
173
  if (pFile == NULL) {
174 175 176
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
M
Minghao Li 已提交
177
  }
M
Minghao Li 已提交
178

179 180 181 182 183 184
  int64_t size = 0;
  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to fstat sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
185

186 187 188 189 190
  pData = taosMemoryMalloc(size + 1);
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }
M
Minghao Li 已提交
191

192 193 194 195 196
  if (taosReadFile(pFile, pData, size) != size) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
197

198
  pData[size] = '\0';
M
Minghao Li 已提交
199

200 201 202 203 204
  pJson = tjsonParse(pData);
  if (pJson == NULL) {
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
    goto _OVER;
  }
205

206 207 208 209
  if (tjsonToObject(pJson, "RaftCfg", syncDecodeRaftCfg, (void *)pCfg) < 0) {
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
    goto _OVER;
  }
210

211 212
  code = 0;
  sInfo("vgId:%d, succceed to read sync cfg file %s", pNode->vgId, file);
213

214 215 216 217
_OVER:
  if (pData != NULL) taosMemoryFree(pData);
  if (pJson != NULL) cJSON_Delete(pJson);
  if (pFile != NULL) taosCloseFile(&pFile);
218

219 220
  if (code != 0) {
    sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr());
221
  }
M
Minghao Li 已提交
222 223 224
  return code;
}

225 226 227 228 229
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
  SRaftCfg *pCfg = &pNode->raftCfg;
  if (pCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT) {
    return -1;
  }
M
Minghao Li 已提交
230

231 232
  pCfg->configIndexArr[pCfg->configIndexCount] = cfgIndex;
  pCfg->configIndexCount++;
M
Minghao Li 已提交
233
  return 0;
234
}