syncRaftLog.c 10.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftLog.h"
M
Minghao Li 已提交
17
#include "wal.h"
M
Minghao Li 已提交
18

M
Minghao Li 已提交
19
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
wafwerar's avatar
wafwerar 已提交
20
  SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
M
Minghao Li 已提交
21
  assert(pLogStore != NULL);
M
Minghao Li 已提交
22

wafwerar's avatar
wafwerar 已提交
23
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36
  assert(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;

  pLogStore->appendEntry = logStoreAppendEntry;
  pLogStore->getEntry = logStoreGetEntry;
  pLogStore->truncate = logStoreTruncate;
  pLogStore->getLastIndex = logStoreLastIndex;
  pLogStore->getLastTerm = logStoreLastTerm;
  pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
  pLogStore->getCommitIndex = logStoreGetCommitIndex;
M
Minghao Li 已提交
37
  return pLogStore;
M
Minghao Li 已提交
38 39 40 41
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
wafwerar's avatar
wafwerar 已提交
42 43
    taosMemoryFree(pLogStore->data);
    taosMemoryFree(pLogStore);
M
Minghao Li 已提交
44 45 46
  }
}

M
Minghao Li 已提交
47 48 49 50
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
51 52
  SyncIndex lastIndex = logStoreLastIndex(pLogStore);
  assert(pEntry->index == lastIndex + 1);
M
Minghao Li 已提交
53

M
Minghao Li 已提交
54 55 56 57 58 59
  int          code = 0;
  SSyncLogMeta syncMeta;
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
  code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
M
Minghao Li 已提交
60
  if (code != 0) {
M
Minghao Li 已提交
61 62 63 64 65 66
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
M
Minghao Li 已提交
67
    ASSERT(0);
M
Minghao Li 已提交
68 69
  }
  // assert(code == 0);
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71
  walFsync(pWal, true);
M
Minghao Li 已提交
72
  return code;
M
Minghao Li 已提交
73
}
M
Minghao Li 已提交
74

M
Minghao Li 已提交
75 76 77 78
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
79 80
  if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
M
Minghao Li 已提交
81
    int32_t         code = walReadWithHandle(pWalHandle, index);
M
Minghao Li 已提交
82
    if (code != 0) {
M
Minghao Li 已提交
83 84 85 86 87 88
      int32_t     err = terrno;
      const char* errStr = tstrerror(err);
      int32_t     linuxErr = errno;
      const char* linuxErrMsg = strerror(errno);
      sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
             linuxErrMsg);
M
Minghao Li 已提交
89
      ASSERT(0);
M
Minghao Li 已提交
90 91
    }
    // assert(walReadWithHandle(pWalHandle, index) == 0);
M
Minghao Li 已提交
92

L
Liu Jicong 已提交
93
    SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
94 95
    assert(pEntry != NULL);

M
Minghao Li 已提交
96 97 98 99 100 101
    pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
    pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
    pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
    pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
    pEntry->term = pWalHandle->pHead->head.syncMeta.term;
    pEntry->index = index;
L
Liu Jicong 已提交
102 103
    assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
    memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
104

M
Minghao Li 已提交
105 106
    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);
M
Minghao Li 已提交
107
    return pEntry;
M
Minghao Li 已提交
108

M
Minghao Li 已提交
109 110 111
  } else {
    return NULL;
  }
M
Minghao Li 已提交
112
}
M
Minghao Li 已提交
113

M
Minghao Li 已提交
114 115 116
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
117
  // assert(walRollback(pWal, fromIndex) == 0);
M
Minghao Li 已提交
118 119
  int32_t code = walRollback(pWal, fromIndex);
  if (code != 0) {
M
Minghao Li 已提交
120 121 122 123 124 125
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
M
Minghao Li 已提交
126
    ASSERT(0);
M
Minghao Li 已提交
127
  }
M
Minghao Li 已提交
128
  return 0;  // to avoid compiler error
M
Minghao Li 已提交
129
}
M
Minghao Li 已提交
130

M
Minghao Li 已提交
131
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
132 133
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
134
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
135 136
  return lastIndex;
}
M
Minghao Li 已提交
137

M
Minghao Li 已提交
138
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
139
  SyncTerm        lastTerm = 0;
M
Minghao Li 已提交
140
  SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
M
Minghao Li 已提交
141 142
  if (pLastEntry != NULL) {
    lastTerm = pLastEntry->term;
wafwerar's avatar
wafwerar 已提交
143
    taosMemoryFree(pLastEntry);
M
Minghao Li 已提交
144
  }
M
Minghao Li 已提交
145 146
  return lastTerm;
}
M
Minghao Li 已提交
147

M
Minghao Li 已提交
148 149 150
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
151
  // assert(walCommit(pWal, index) == 0);
M
Minghao Li 已提交
152 153
  int32_t code = walCommit(pWal, index);
  if (code != 0) {
M
Minghao Li 已提交
154 155 156 157
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
158
    sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
M
Minghao Li 已提交
159
    ASSERT(0);
M
Minghao Li 已提交
160
  }
M
Minghao Li 已提交
161
  return 0;  // to avoid compiler error
M
Minghao Li 已提交
162
}
M
Minghao Li 已提交
163

M
Minghao Li 已提交
164 165 166 167 168 169 170 171 172
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
173 174 175 176 177

  SSyncRaftEntry* pEntry = NULL;
  if (lastIndex > 0) {
    pEntry = logStoreGetEntry(pLogStore, lastIndex);
  }
M
Minghao Li 已提交
178 179
  return pEntry;
}
M
Minghao Li 已提交
180

M
Minghao Li 已提交
181
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
182
  char               u64buf[128];
M
Minghao Li 已提交
183 184
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);

    cJSON* pEntries = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
    SyncIndex lastIndex = logStoreLastIndex(pLogStore);
    for (SyncIndex i = 0; i <= lastIndex; ++i) {
      SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
      cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
      syncEntryDestory(pEntry);
    }
M
Minghao Li 已提交
204 205 206 207 208 209
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
  return pJson;
}
M
Minghao Li 已提交
210 211 212 213 214 215

char* logStore2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStore2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
216 217
}

M
Minghao Li 已提交
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
  char               u64buf[128];
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot);
  return pJson;
}

char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStoreSimple2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
246
// for debug -----------------
M
Minghao Li 已提交
247
void logStorePrint(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
248 249 250
  char* serialized = logStore2Str(pLogStore);
  printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
251
  taosMemoryFree(serialized);
M
Minghao Li 已提交
252 253 254 255
}

void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
256
  printf("logStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
257
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
258
  taosMemoryFree(serialized);
M
Minghao Li 已提交
259
}
M
Minghao Li 已提交
260

M
Minghao Li 已提交
261 262
void logStoreLog(SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
263
  sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
264
  taosMemoryFree(serialized);
M
Minghao Li 已提交
265 266 267 268
}

void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
269
  sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
270
  taosMemoryFree(serialized);
271
}
M
Minghao Li 已提交
272 273 274 275 276 277

// for debug -----------------
void logStoreSimplePrint(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
278
  taosMemoryFree(serialized);
M
Minghao Li 已提交
279 280 281 282 283 284
}

void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
285
  taosMemoryFree(serialized);
M
Minghao Li 已提交
286 287 288 289 290
}

void logStoreSimpleLog(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
291
  taosMemoryFree(serialized);
M
Minghao Li 已提交
292 293 294 295 296
}

void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
297
  taosMemoryFree(serialized);
L
Liu Jicong 已提交
298
}