syncRaftStore.c 4.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftStore.h"
M
Minghao Li 已提交
18
#include "syncUtil.h"
M
Minghao Li 已提交
19

M
Minghao Li 已提交
20 21 22
// private function
static int32_t raftStoreInit(SRaftStore *pRaftStore);
static bool    raftStoreFileExist(char *path);
M
Minghao Li 已提交
23

M
Minghao Li 已提交
24
// public function
M
Minghao Li 已提交
25 26 27
SRaftStore *raftStoreOpen(const char *path) {
  int32_t ret;

28
  SRaftStore *pRaftStore = taosMemoryCalloc(1, sizeof(SRaftStore));
M
Minghao Li 已提交
29
  if (pRaftStore == NULL) {
30
    terrno = TSDB_CODE_OUT_OF_MEMORY;
M
Minghao Li 已提交
31 32 33
    return NULL;
  }

34
  snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
M
Minghao Li 已提交
35 36
  if (!raftStoreFileExist(pRaftStore->path)) {
    ret = raftStoreInit(pRaftStore);
M
Minghao Li 已提交
37
    ASSERT(ret == 0);
M
Minghao Li 已提交
38 39
  }

40
  char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
M
Minghao Li 已提交
41
  pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
M
Minghao Li 已提交
42
  ASSERT(pRaftStore->pFile != NULL);
M
Minghao Li 已提交
43 44

  int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE);
45
  ASSERT(len > 0);
M
Minghao Li 已提交
46 47

  ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
M
Minghao Li 已提交
48
  ASSERT(ret == 0);
M
Minghao Li 已提交
49 50 51 52 53

  return pRaftStore;
}

static int32_t raftStoreInit(SRaftStore *pRaftStore) {
M
Minghao Li 已提交
54
  ASSERT(pRaftStore != NULL);
M
Minghao Li 已提交
55

56
  pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE);
M
Minghao Li 已提交
57
  ASSERT(pRaftStore->pFile != NULL);
M
Minghao Li 已提交
58 59 60 61 62 63

  pRaftStore->currentTerm = 0;
  pRaftStore->voteFor.addr = 0;
  pRaftStore->voteFor.vgId = 0;

  int32_t ret = raftStorePersist(pRaftStore);
M
Minghao Li 已提交
64
  ASSERT(ret == 0);
M
Minghao Li 已提交
65 66 67 68 69 70

  taosCloseFile(&pRaftStore->pFile);
  return 0;
}

int32_t raftStoreClose(SRaftStore *pRaftStore) {
71
  if (pRaftStore == NULL) return 0;
M
Minghao Li 已提交
72 73

  taosCloseFile(&pRaftStore->pFile);
wafwerar's avatar
wafwerar 已提交
74
  taosMemoryFree(pRaftStore);
M
Minghao Li 已提交
75 76 77 78 79
  pRaftStore = NULL;
  return 0;
}

int32_t raftStorePersist(SRaftStore *pRaftStore) {
M
Minghao Li 已提交
80
  ASSERT(pRaftStore != NULL);
M
Minghao Li 已提交
81 82

  int32_t ret;
83
  char    storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
M
Minghao Li 已提交
84
  ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
M
Minghao Li 已提交
85
  ASSERT(ret == 0);
M
Minghao Li 已提交
86 87 88 89

  taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET);

  ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf));
M
Minghao Li 已提交
90
  ASSERT(ret == RAFT_STORE_BLOCK_SIZE);
M
Minghao Li 已提交
91 92 93 94 95

  taosFsyncFile(pRaftStore->pFile);
  return 0;
}

M
Minghao Li 已提交
96 97 98 99
static bool raftStoreFileExist(char *path) {
  bool b = taosStatFile(path, NULL, NULL) >= 0;
  return b;
}
M
Minghao Li 已提交
100 101

int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
M
Minghao Li 已提交
102
  ASSERT(pRaftStore != NULL);
M
Minghao Li 已提交
103 104

  cJSON *pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
105

106
  char u64Buf[128] = {0};
M
Minghao Li 已提交
107
  snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->currentTerm);
M
Minghao Li 已提交
108 109
  cJSON_AddStringToObject(pRoot, "current_term", u64Buf);

M
Minghao Li 已提交
110
  snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->voteFor.addr);
M
Minghao Li 已提交
111 112
  cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);

M
Minghao Li 已提交
113 114 115 116
  cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);

  char *serialized = cJSON_Print(pRoot);
  int   len2 = strlen(serialized);
M
Minghao Li 已提交
117
  ASSERT(len2 < len);
M
Minghao Li 已提交
118 119
  memset(buf, 0, len);
  snprintf(buf, len, "%s", serialized);
wafwerar's avatar
wafwerar 已提交
120
  taosMemoryFree(serialized);
M
Minghao Li 已提交
121 122 123 124

  cJSON_Delete(pRoot);
  return 0;
}
M
Minghao Li 已提交
125

M
Minghao Li 已提交
126
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
M
Minghao Li 已提交
127
  ASSERT(pRaftStore != NULL);
M
Minghao Li 已提交
128

M
Minghao Li 已提交
129
  ASSERT(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
M
Minghao Li 已提交
130
  cJSON *pRoot = cJSON_Parse(buf);
M
Minghao Li 已提交
131

M
Minghao Li 已提交
132
  cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
M
Minghao Li 已提交
133
  ASSERT(cJSON_IsString(pCurrentTerm));
M
Minghao Li 已提交
134
  sscanf(pCurrentTerm->valuestring, "%" PRIu64 "", &(pRaftStore->currentTerm));
M
Minghao Li 已提交
135

M
Minghao Li 已提交
136
  cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
M
Minghao Li 已提交
137
  ASSERT(cJSON_IsString(pVoteForAddr));
M
Minghao Li 已提交
138
  sscanf(pVoteForAddr->valuestring, "%" PRIu64 "", &(pRaftStore->voteFor.addr));
M
Minghao Li 已提交
139

M
Minghao Li 已提交
140 141
  cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
  pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
M
Minghao Li 已提交
142

M
Minghao Li 已提交
143 144 145
  cJSON_Delete(pRoot);
  return 0;
}
M
Minghao Li 已提交
146

M
Minghao Li 已提交
147 148
bool raftStoreHasVoted(SRaftStore *pRaftStore) {
  bool b = syncUtilEmptyId(&(pRaftStore->voteFor));
M
Minghao Li 已提交
149
  return (!b);
M
Minghao Li 已提交
150 151 152
}

void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) {
M
Minghao Li 已提交
153
  ASSERT(!syncUtilEmptyId(pRaftId));
M
Minghao Li 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
  pRaftStore->voteFor = *pRaftId;
  raftStorePersist(pRaftStore);
}

void raftStoreClearVote(SRaftStore *pRaftStore) {
  pRaftStore->voteFor = EMPTY_RAFT_ID;
  raftStorePersist(pRaftStore);
}

void raftStoreNextTerm(SRaftStore *pRaftStore) {
  ++(pRaftStore->currentTerm);
  raftStorePersist(pRaftStore);
}

void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
  pRaftStore->currentTerm = term;
  raftStorePersist(pRaftStore);
}