syncRaftCfg.c 9.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17 18
#include "syncRaftCfg.h"
#include "syncUtil.h"
19 20
#include "tjson.h"

C
cadem 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
const char* syncRoleToStr(ESyncRole role) {
  switch (role) {
    case TAOS_SYNC_ROLE_VOTER:
      return "voter";
    case TAOS_SYNC_ROLE_LEARNER:
      return "learner";
    default:
      return "unknown";
  }
}

const ESyncRole syncStrToRole(char* str) {
  if(strcmp(str, "voter") == 0){
    return TAOS_SYNC_ROLE_VOTER;
  }
  if(strcmp(str, "learner") == 0){
    return TAOS_SYNC_ROLE_LEARNER;
  }

  return TAOS_SYNC_ROLE_ERROR;
}

43 44
static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
  SSyncCfg *pCfg = (SSyncCfg *)pObj;
C
cadem 已提交
45
  if (tjsonAddDoubleToObject(pJson, "totalReplicaNum", pCfg->totalReplicaNum) < 0) return -1;
S
Shengliang Guan 已提交
46 47
  if (tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum) < 0) return -1;
  if (tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex) < 0) return -1;
48 49 50 51

  SJson *nodeInfo = tjsonCreateArray();
  if (nodeInfo == NULL) return -1;
  if (tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo) < 0) return -1;
C
cadem 已提交
52
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
53 54
    SJson *info = tjsonCreateObject();
    if (info == NULL) return -1;
S
Shengliang Guan 已提交
55
    if (tjsonAddDoubleToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort) < 0) return -1;
56 57 58
    if (tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn) < 0) return -1;
    if (tjsonAddIntegerToObject(info, "nodeId", pCfg->nodeInfo[i].nodeId) < 0) return -1;
    if (tjsonAddIntegerToObject(info, "clusterId", pCfg->nodeInfo[i].clusterId) < 0) return -1;
C
cadem 已提交
59
    if (tjsonAddStringToObject(info, "nodeRole", syncRoleToStr(pCfg->nodeInfo[i].nodeRole)) < 0) return -1;
60
    if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1;
61 62 63 64 65
  }

  return 0;
}

66 67 68
static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
  SRaftCfg *pCfg = (SRaftCfg *)pObj;
  if (tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1;
S
Shengliang Guan 已提交
69 70
  if (tjsonAddDoubleToObject(pJson, "isStandBy", pCfg->isStandBy) < 0) return -1;
  if (tjsonAddDoubleToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy) < 0) return -1;
S
Shengliang Guan 已提交
71 72
  if (tjsonAddDoubleToObject(pJson, "batchSize", pCfg->batchSize) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "lastConfigIndex", pCfg->lastConfigIndex) < 0) return -1;
S
Shengliang Guan 已提交
73
  if (tjsonAddDoubleToObject(pJson, "configIndexCount", pCfg->configIndexCount) < 0) return -1;
74 75 76 77 78 79 80 81 82

  SJson *configIndexArr = tjsonCreateArray();
  if (configIndexArr == NULL) return -1;
  if (tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr) < 0) return -1;
  for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
    SJson *configIndex = tjsonCreateObject();
    if (configIndex == NULL) return -1;
    if (tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]) < 0) return -1;
    if (tjsonAddItemToArray(configIndexArr, configIndex) < 0) return -1;
83 84 85 86 87
  }

  return 0;
}

88 89 90 91 92 93 94 95 96
int32_t syncWriteCfgFile(SSyncNode *pNode) {
  int32_t     code = -1;
  char       *buffer = NULL;
  SJson      *pJson = NULL;
  TdFilePtr   pFile = NULL;
  const char *realfile = pNode->configPath;
  SRaftCfg   *pCfg = &pNode->raftCfg;
  char        file[PATH_MAX] = {0};
  snprintf(file, sizeof(file), "%s.bak", realfile);
97

98 99 100 101 102 103
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  pJson = tjsonCreateObject();
  if (pJson == NULL) goto _OVER;
  if (tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg) < 0) goto _OVER;
  buffer = tjsonToString(pJson);
  if (buffer == NULL) goto _OVER;
S
Shengliang Guan 已提交
104 105 106 107
  terrno = 0;

  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) goto _OVER;
108

109 110 111
  int32_t len = strlen(buffer);
  if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
  if (taosFsyncFile(pFile) < 0) goto _OVER;
M
Minghao Li 已提交
112

S
Shengliang Guan 已提交
113
  taosCloseFile(&pFile);
114
  if (taosRenameFile(file, realfile) != 0) goto _OVER;
M
Minghao Li 已提交
115

116
  code = 0;
C
cadem 已提交
117 118
  sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64, pNode->vgId, 
                                            realfile, len, pNode->raftCfg.lastConfigIndex);
M
Minghao Li 已提交
119

120 121 122 123
_OVER:
  if (pJson != NULL) tjsonDelete(pJson);
  if (buffer != NULL) taosMemoryFree(buffer);
  if (pFile != NULL) taosCloseFile(&pFile);
M
Minghao Li 已提交
124

125
  if (code != 0) {
S
Shengliang Guan 已提交
126
    if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
127
    sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, terrstr());
128 129
  }
  return code;
130 131
}

132 133 134 135
static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) {
  SSyncCfg *pCfg = (SSyncCfg *)pObj;
  int32_t   code = 0;

C
cadem 已提交
136
  tjsonGetInt32ValueFromDouble(pJson, "totalReplicaNum", pCfg->totalReplicaNum, code);
S
Shengliang Guan 已提交
137
  tjsonGetInt32ValueFromDouble(pJson, "replicaNum", pCfg->replicaNum, code);
138
  if (code < 0) return -1;
S
Shengliang Guan 已提交
139
  tjsonGetInt32ValueFromDouble(pJson, "myIndex", pCfg->myIndex, code);
140 141 142 143
  if (code < 0) return -1;

  SJson *nodeInfo = tjsonGetObjectItem(pJson, "nodeInfo");
  if (nodeInfo == NULL) return -1;
C
cadem 已提交
144
  pCfg->totalReplicaNum = tjsonGetArraySize(nodeInfo);
145

C
cadem 已提交
146
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
147 148
    SJson *info = tjsonGetArrayItem(nodeInfo, i);
    if (info == NULL) return -1;
S
Shengliang Guan 已提交
149
    tjsonGetUInt16ValueFromDouble(info, "nodePort", pCfg->nodeInfo[i].nodePort, code);
150
    if (code < 0) return -1;
S
Shengliang Guan 已提交
151
    code = tjsonGetStringValue(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn);
152 153 154
    if (code < 0) return -1;
    tjsonGetNumberValue(info, "nodeId", pCfg->nodeInfo[i].nodeId, code);
    tjsonGetNumberValue(info, "clusterId", pCfg->nodeInfo[i].clusterId, code);
C
cadem 已提交
155
    char role[10] = {0};
C
cadem 已提交
156 157
    code = tjsonGetStringValue(info, "nodeRole", role);
    if(code < 0) return -1;
C
cadem 已提交
158 159 160 161 162 163
    if(strlen(role) != 0){
      pCfg->nodeInfo[i].nodeRole = syncStrToRole(role);
    }
    else{
      pCfg->nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
    }
M
Minghao Li 已提交
164 165
  }

166
  return 0;
M
Minghao Li 已提交
167 168
}

169 170 171
static int32_t syncDecodeRaftCfg(const SJson *pJson, void *pObj) {
  SRaftCfg *pCfg = (SRaftCfg *)pObj;
  int32_t   code = 0;
M
Minghao Li 已提交
172

173
  if (tjsonToObject(pJson, "SSyncCfg", syncDecodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1;
M
Minghao Li 已提交
174

S
Shengliang Guan 已提交
175
  tjsonGetInt8ValueFromDouble(pJson, "isStandBy", pCfg->isStandBy, code);
176
  if (code < 0) return -1;
S
Shengliang Guan 已提交
177
  tjsonGetInt8ValueFromDouble(pJson, "snapshotStrategy", pCfg->snapshotStrategy, code);
178
  if (code < 0) return -1;
S
Shengliang Guan 已提交
179
  tjsonGetInt32ValueFromDouble(pJson, "batchSize", pCfg->batchSize, code);
180 181 182
  if (code < 0) return -1;
  tjsonGetNumberValue(pJson, "lastConfigIndex", pCfg->lastConfigIndex, code);
  if (code < 0) return -1;
S
Shengliang Guan 已提交
183
  tjsonGetInt32ValueFromDouble(pJson, "configIndexCount", pCfg->configIndexCount, code);
M
Minghao Li 已提交
184

185 186
  SJson *configIndexArr = tjsonGetObjectItem(pJson, "configIndexArr");
  if (configIndexArr == NULL) return -1;
M
Minghao Li 已提交
187

188 189 190 191 192 193
  pCfg->configIndexCount = tjsonGetArraySize(configIndexArr);
  for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
    SJson *configIndex = tjsonGetArrayItem(configIndexArr, i);
    if (configIndex == NULL) return -1;
    tjsonGetNumberValue(configIndex, "index", pCfg->configIndexArr[i], code);
    if (code < 0) return -1;
M
Minghao Li 已提交
194 195 196 197 198
  }

  return 0;
}

199 200 201 202 203 204 205
int32_t syncReadCfgFile(SSyncNode *pNode) {
  int32_t     code = -1;
  TdFilePtr   pFile = NULL;
  char       *pData = NULL;
  SJson      *pJson = NULL;
  const char *file = pNode->configPath;
  SRaftCfg   *pCfg = &pNode->raftCfg;
206

207
  pFile = taosOpenFile(file, TD_FILE_READ);
M
Minghao Li 已提交
208
  if (pFile == NULL) {
209 210 211
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
M
Minghao Li 已提交
212
  }
M
Minghao Li 已提交
213

214 215 216 217 218 219
  int64_t size = 0;
  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to fstat sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
220

221 222 223 224 225
  pData = taosMemoryMalloc(size + 1);
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }
M
Minghao Li 已提交
226

227 228 229 230 231
  if (taosReadFile(pFile, pData, size) != size) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
232

233
  pData[size] = '\0';
M
Minghao Li 已提交
234

235 236 237 238 239
  pJson = tjsonParse(pData);
  if (pJson == NULL) {
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
    goto _OVER;
  }
240

241 242 243 244
  if (tjsonToObject(pJson, "RaftCfg", syncDecodeRaftCfg, (void *)pCfg) < 0) {
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
    goto _OVER;
  }
245

246 247
  code = 0;
  sInfo("vgId:%d, succceed to read sync cfg file %s", pNode->vgId, file);
248

249 250 251 252
_OVER:
  if (pData != NULL) taosMemoryFree(pData);
  if (pJson != NULL) cJSON_Delete(pJson);
  if (pFile != NULL) taosCloseFile(&pFile);
253

254 255
  if (code != 0) {
    sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr());
256
  }
M
Minghao Li 已提交
257 258 259
  return code;
}

260 261
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
  SRaftCfg *pCfg = &pNode->raftCfg;
262
  if (pCfg->configIndexCount < MAX_CONFIG_INDEX_COUNT) {
263 264
    return -1;
  }
M
Minghao Li 已提交
265

266 267
  pCfg->configIndexArr[pCfg->configIndexCount] = cfgIndex;
  pCfg->configIndexCount++;
M
Minghao Li 已提交
268
  return 0;
269
}