syncRaftStore.c 7.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftStore.h"
M
Minghao Li 已提交
17
#include "cJSON.h"
M
Minghao Li 已提交
18 19
#include "syncEnv.h"
#include "syncUtil.h"
M
Minghao Li 已提交
20

M
Minghao Li 已提交
21 22 23
// private function
static int32_t raftStoreInit(SRaftStore *pRaftStore);
static bool    raftStoreFileExist(char *path);
M
Minghao Li 已提交
24

M
Minghao Li 已提交
25
// public function
M
Minghao Li 已提交
26 27 28
SRaftStore *raftStoreOpen(const char *path) {
  int32_t ret;

wafwerar's avatar
wafwerar 已提交
29
  SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore));
M
Minghao Li 已提交
30 31 32 33 34 35 36
  if (pRaftStore == NULL) {
    sError("raftStoreOpen malloc error");
    return NULL;
  }
  memset(pRaftStore, 0, sizeof(*pRaftStore));
  snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);

37
  char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
M
Minghao Li 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
  memset(storeBuf, 0, sizeof(storeBuf));

  if (!raftStoreFileExist(pRaftStore->path)) {
    ret = raftStoreInit(pRaftStore);
    assert(ret == 0);
  }

  pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
  assert(pRaftStore->pFile != NULL);

  int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE);
  assert(len == RAFT_STORE_BLOCK_SIZE);

  ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
  assert(ret == 0);

  return pRaftStore;
}

static int32_t raftStoreInit(SRaftStore *pRaftStore) {
  assert(pRaftStore != NULL);

60
  pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE);
M
Minghao Li 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
  assert(pRaftStore->pFile != NULL);

  pRaftStore->currentTerm = 0;
  pRaftStore->voteFor.addr = 0;
  pRaftStore->voteFor.vgId = 0;

  int32_t ret = raftStorePersist(pRaftStore);
  assert(ret == 0);

  taosCloseFile(&pRaftStore->pFile);
  return 0;
}

int32_t raftStoreClose(SRaftStore *pRaftStore) {
  assert(pRaftStore != NULL);

  taosCloseFile(&pRaftStore->pFile);
wafwerar's avatar
wafwerar 已提交
78
  taosMemoryFree(pRaftStore);
M
Minghao Li 已提交
79 80 81 82 83 84 85 86
  pRaftStore = NULL;
  return 0;
}

int32_t raftStorePersist(SRaftStore *pRaftStore) {
  assert(pRaftStore != NULL);

  int32_t ret;
87
  char    storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
M
Minghao Li 已提交
88 89 90 91 92 93 94 95 96 97 98 99
  ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
  assert(ret == 0);

  taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET);

  ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf));
  assert(ret == RAFT_STORE_BLOCK_SIZE);

  taosFsyncFile(pRaftStore->pFile);
  return 0;
}

M
Minghao Li 已提交
100 101 102 103
static bool raftStoreFileExist(char *path) {
  bool b = taosStatFile(path, NULL, NULL) >= 0;
  return b;
}
M
Minghao Li 已提交
104 105 106 107 108

int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
  assert(pRaftStore != NULL);

  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
109

110
  char u64Buf[128] = {0};
M
Minghao Li 已提交
111 112 113 114 115 116
  snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->currentTerm);
  cJSON_AddStringToObject(pRoot, "current_term", u64Buf);

  snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->voteFor.addr);
  cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);

M
Minghao Li 已提交
117 118
  cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);

M
Minghao Li 已提交
119
  uint64_t u64 = pRaftStore->voteFor.addr;
120
  char     host[128] = {0};
M
Minghao Li 已提交
121 122 123 124 125
  uint16_t port;
  syncUtilU642Addr(u64, host, sizeof(host), &port);
  cJSON_AddStringToObject(pRoot, "addr_host", host);
  cJSON_AddNumberToObject(pRoot, "addr_port", port);

M
Minghao Li 已提交
126 127 128 129 130
  char *serialized = cJSON_Print(pRoot);
  int   len2 = strlen(serialized);
  assert(len2 < len);
  memset(buf, 0, len);
  snprintf(buf, len, "%s", serialized);
wafwerar's avatar
wafwerar 已提交
131
  taosMemoryFree(serialized);
M
Minghao Li 已提交
132 133 134 135

  cJSON_Delete(pRoot);
  return 0;
}
M
Minghao Li 已提交
136

M
Minghao Li 已提交
137 138
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
  assert(pRaftStore != NULL);
M
Minghao Li 已提交
139

M
Minghao Li 已提交
140 141
  assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
  cJSON *pRoot = cJSON_Parse(buf);
M
Minghao Li 已提交
142

M
Minghao Li 已提交
143
  cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
M
Minghao Li 已提交
144 145
  assert(cJSON_IsString(pCurrentTerm));
  sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm));
M
Minghao Li 已提交
146

M
Minghao Li 已提交
147
  cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
M
Minghao Li 已提交
148 149
  assert(cJSON_IsString(pVoteForAddr));
  sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr));
M
Minghao Li 已提交
150

M
Minghao Li 已提交
151 152
  cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
  pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
M
Minghao Li 已提交
153

M
Minghao Li 已提交
154 155 156
  cJSON_Delete(pRoot);
  return 0;
}
M
Minghao Li 已提交
157

M
Minghao Li 已提交
158 159
bool raftStoreHasVoted(SRaftStore *pRaftStore) {
  bool b = syncUtilEmptyId(&(pRaftStore->voteFor));
M
Minghao Li 已提交
160
  return (!b);
M
Minghao Li 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
}

void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) {
  assert(!syncUtilEmptyId(pRaftId));
  pRaftStore->voteFor = *pRaftId;
  raftStorePersist(pRaftStore);
}

void raftStoreClearVote(SRaftStore *pRaftStore) {
  pRaftStore->voteFor = EMPTY_RAFT_ID;
  raftStorePersist(pRaftStore);
}

void raftStoreNextTerm(SRaftStore *pRaftStore) {
  ++(pRaftStore->currentTerm);
  raftStorePersist(pRaftStore);
}

void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
  pRaftStore->currentTerm = term;
  raftStorePersist(pRaftStore);
}

M
Minghao Li 已提交
184 185
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; }

M
Minghao Li 已提交
186
cJSON *raftStore2Json(SRaftStore *pRaftStore) {
187
  char   u64buf[128] = {0};
M
Minghao Li 已提交
188 189 190 191 192 193 194 195 196 197 198
  cJSON *pRoot = cJSON_CreateObject();

  if (pRaftStore != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->currentTerm);
    cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);

    cJSON *pVoteFor = cJSON_CreateObject();
    snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->voteFor.addr);
    cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
    {
      uint64_t u64 = pRaftStore->voteFor.addr;
199
      char     host[128] = {0};
M
Minghao Li 已提交
200 201 202 203 204 205 206
      uint16_t port;
      syncUtilU642Addr(u64, host, sizeof(host), &port);
      cJSON_AddStringToObject(pVoteFor, "addr_host", host);
      cJSON_AddNumberToObject(pVoteFor, "addr_port", port);
    }
    cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId);
    cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor);
M
Minghao Li 已提交
207 208 209

    int hasVoted = raftStoreHasVoted(pRaftStore);
    cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted);
M
Minghao Li 已提交
210 211 212 213 214 215 216 217 218
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SRaftStore", pRoot);
  return pJson;
}

char *raftStore2Str(SRaftStore *pRaftStore) {
  cJSON *pJson = raftStore2Json(pRaftStore);
M
Minghao Li 已提交
219
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
220 221 222 223
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
224 225
// for debug -------------------
void raftStorePrint(SRaftStore *pObj) {
M
Minghao Li 已提交
226
  char *serialized = raftStore2Str(pObj);
M
Minghao Li 已提交
227 228
  printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
229
  taosMemoryFree(serialized);
M
Minghao Li 已提交
230 231 232
}

void raftStorePrint2(char *s, SRaftStore *pObj) {
M
Minghao Li 已提交
233
  char *serialized = raftStore2Str(pObj);
M
Minghao Li 已提交
234 235
  printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
236
  taosMemoryFree(serialized);
M
Minghao Li 已提交
237 238
}
void raftStoreLog(SRaftStore *pObj) {
M
Minghao Li 已提交
239
  char *serialized = raftStore2Str(pObj);
M
Minghao Li 已提交
240
  sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
241
  taosMemoryFree(serialized);
M
Minghao Li 已提交
242 243 244
}

void raftStoreLog2(char *s, SRaftStore *pObj) {
M
Minghao Li 已提交
245
  char *serialized = raftStore2Str(pObj);
M
Minghao Li 已提交
246
  sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
247
  taosMemoryFree(serialized);
M
Minghao Li 已提交
248
}