syncRaftStore.c 6.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftStore.h"
M
Minghao Li 已提交
18
#include "syncUtil.h"
S
Shengliang Guan 已提交
19
#include "tjson.h"
M
Minghao Li 已提交
20

21 22 23
int32_t raftStoreReadFile(SSyncNode *pNode);
int32_t raftStoreWriteFile(SSyncNode *pNode);

S
Shengliang Guan 已提交
24 25
static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) {
  int32_t code = 0;
M
Minghao Li 已提交
26

S
Shengliang Guan 已提交
27 28 29 30 31 32
  tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code);
  if (code < 0) return -1;
  tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code);
  if (code < 0) return -1;
  tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code);
  if (code < 0) return -1;
M
Minghao Li 已提交
33 34 35 36

  return 0;
}

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
int32_t raftStoreReadFile(SSyncNode *pNode) {
  int32_t     code = -1;
  TdFilePtr   pFile = NULL;
  char       *pData = NULL;
  SJson      *pJson = NULL;
  const char *file = pNode->raftStorePath;
  SRaftStore *pStore = &pNode->raftStore;

  if (taosStatFile(file, NULL, NULL) < 0) {
    sInfo("vgId:%d, raft store file:%s not exist, use default value", pNode->vgId, file);
    pStore->currentTerm = 0;
    pStore->voteFor.addr = 0;
    pStore->voteFor.vgId = 0;
    return raftStoreWriteFile(pNode);
  }
M
Minghao Li 已提交
52

S
Shengliang Guan 已提交
53 54 55 56 57 58
  pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
59

S
Shengliang Guan 已提交
60 61 62 63 64 65
  int64_t size = 0;
  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
66

S
Shengliang Guan 已提交
67 68 69 70 71
  pData = taosMemoryMalloc(size + 1);
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }
M
Minghao Li 已提交
72

S
Shengliang Guan 已提交
73 74 75 76 77
  if (taosReadFile(pFile, pData, size) != size) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
    goto _OVER;
  }
M
Minghao Li 已提交
78

S
Shengliang Guan 已提交
79
  pData[size] = '\0';
M
Minghao Li 已提交
80

S
Shengliang Guan 已提交
81 82 83 84 85
  pJson = tjsonParse(pData);
  if (pJson == NULL) {
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
    goto _OVER;
  }
M
Minghao Li 已提交
86

S
Shengliang Guan 已提交
87 88 89 90
  if (raftStoreDecode(pJson, pStore) < 0) {
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
    goto _OVER;
  }
M
Minghao Li 已提交
91

S
Shengliang Guan 已提交
92 93
  code = 0;
  sInfo("vgId:%d, succceed to read raft store file %s", pNode->vgId, file);
M
Minghao Li 已提交
94

S
Shengliang Guan 已提交
95 96 97 98
_OVER:
  if (pData != NULL) taosMemoryFree(pData);
  if (pJson != NULL) cJSON_Delete(pJson);
  if (pFile != NULL) taosCloseFile(&pFile);
M
Minghao Li 已提交
99

S
Shengliang Guan 已提交
100 101 102 103 104
  if (code != 0) {
    sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
  }
  return code;
}
M
Minghao Li 已提交
105

S
Shengliang Guan 已提交
106 107 108 109
static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) {
  if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) return -1;
  if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) return -1;
  if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) return -1;
M
Minghao Li 已提交
110 111
  return 0;
}
M
Minghao Li 已提交
112

S
Shengliang Guan 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
int32_t raftStoreWriteFile(SSyncNode *pNode) {
  int32_t     code = -1;
  char       *buffer = NULL;
  SJson      *pJson = NULL;
  TdFilePtr   pFile = NULL;
  const char *realfile = pNode->raftStorePath;
  SRaftStore *pStore = &pNode->raftStore;
  char        file[PATH_MAX] = {0};
  snprintf(file, sizeof(file), "%s.bak", realfile);

  terrno = TSDB_CODE_OUT_OF_MEMORY;
  pJson = tjsonCreateObject();
  if (pJson == NULL) goto _OVER;
  if (raftStoreEncode(pJson, pStore) != 0) goto _OVER;
  buffer = tjsonToString(pJson);
  if (buffer == NULL) goto _OVER;
  terrno = 0;

  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) goto _OVER;

  int32_t len = strlen(buffer);
  if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
  if (taosFsyncFile(pFile) < 0) goto _OVER;

  taosCloseFile(&pFile);
  if (taosRenameFile(file, realfile) != 0) goto _OVER;

  code = 0;
  sInfo("vgId:%d, succeed to write raft store file:%s, len:%d", pNode->vgId, realfile, len);

_OVER:
  if (pJson != NULL) tjsonDelete(pJson);
  if (buffer != NULL) taosMemoryFree(buffer);
  if (pFile != NULL) taosCloseFile(&pFile);

  if (code != 0) {
    if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
    sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr());
  }
  return code;
M
Minghao Li 已提交
154
}
M
Minghao Li 已提交
155

156 157 158 159 160 161 162
int32_t raftStoreOpen(SSyncNode *pNode) {
  taosThreadMutexInit(&pNode->raftStore.mutex, NULL);
  return raftStoreReadFile(pNode);
}

void raftStoreClose(SSyncNode *pNode) { taosThreadMutexDestroy(&pNode->raftStore.mutex); }

S
Shengliang Guan 已提交
163
bool raftStoreHasVoted(SSyncNode *pNode) {
164
  taosThreadMutexLock(&pNode->raftStore.mutex);
S
Shengliang Guan 已提交
165
  bool b = syncUtilEmptyId(&pNode->raftStore.voteFor);
166
  taosThreadMutexUnlock(&pNode->raftStore.mutex);
M
Minghao Li 已提交
167
  return (!b);
M
Minghao Li 已提交
168 169
}

S
Shengliang Guan 已提交
170
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) {
171
  taosThreadMutexLock(&pNode->raftStore.mutex);
S
Shengliang Guan 已提交
172 173
  pNode->raftStore.voteFor = *pRaftId;
  (void)raftStoreWriteFile(pNode);
174
  taosThreadMutexUnlock(&pNode->raftStore.mutex);
M
Minghao Li 已提交
175 176
}

S
Shengliang Guan 已提交
177
void raftStoreClearVote(SSyncNode *pNode) {
178
  taosThreadMutexLock(&pNode->raftStore.mutex);
S
Shengliang Guan 已提交
179 180
  pNode->raftStore.voteFor = EMPTY_RAFT_ID;
  (void)raftStoreWriteFile(pNode);
181
  taosThreadMutexUnlock(&pNode->raftStore.mutex);
M
Minghao Li 已提交
182 183
}

S
Shengliang Guan 已提交
184
void raftStoreNextTerm(SSyncNode *pNode) {
185
  taosThreadMutexLock(&pNode->raftStore.mutex);
S
Shengliang Guan 已提交
186 187
  pNode->raftStore.currentTerm++;
  (void)raftStoreWriteFile(pNode);
188
  taosThreadMutexUnlock(&pNode->raftStore.mutex);
M
Minghao Li 已提交
189 190
}

S
Shengliang Guan 已提交
191
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) {
192 193 194 195 196 197 198 199 200 201 202 203 204
  taosThreadMutexLock(&pNode->raftStore.mutex);
  if (pNode->raftStore.currentTerm < term) {
    pNode->raftStore.currentTerm = term;
    (void)raftStoreWriteFile(pNode);
  }
  taosThreadMutexUnlock(&pNode->raftStore.mutex);
}

SyncTerm raftStoreGetTerm(SSyncNode *pNode) {
  taosThreadMutexLock(&pNode->raftStore.mutex);
  SyncTerm term = pNode->raftStore.currentTerm;
  taosThreadMutexUnlock(&pNode->raftStore.mutex);
  return term;
M
Minghao Li 已提交
205
}