syncRaftLog.c 10.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftLog.h"
M
Minghao Li 已提交
17
#include "wal.h"
M
Minghao Li 已提交
18

M
Minghao Li 已提交
19
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
wafwerar's avatar
wafwerar 已提交
20
  SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
M
Minghao Li 已提交
21
  assert(pLogStore != NULL);
M
Minghao Li 已提交
22

wafwerar's avatar
wafwerar 已提交
23
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36
  assert(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;

  pLogStore->appendEntry = logStoreAppendEntry;
  pLogStore->getEntry = logStoreGetEntry;
  pLogStore->truncate = logStoreTruncate;
  pLogStore->getLastIndex = logStoreLastIndex;
  pLogStore->getLastTerm = logStoreLastTerm;
  pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
  pLogStore->getCommitIndex = logStoreGetCommitIndex;
M
Minghao Li 已提交
37
  return pLogStore;
M
Minghao Li 已提交
38 39 40 41
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
wafwerar's avatar
wafwerar 已提交
42 43
    taosMemoryFree(pLogStore->data);
    taosMemoryFree(pLogStore);
M
Minghao Li 已提交
44 45 46
  }
}

M
Minghao Li 已提交
47 48 49 50
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
51 52
  SyncIndex lastIndex = logStoreLastIndex(pLogStore);
  assert(pEntry->index == lastIndex + 1);
M
Minghao Li 已提交
53

M
Minghao Li 已提交
54 55 56 57 58 59
  int          code = 0;
  SSyncLogMeta syncMeta;
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
  code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
M
Minghao Li 已提交
60 61 62
  if (code != 0) {
    int32_t err = terrno;
    const char *errStr = tstrerror(err);
63 64 65
    int32_t linuxErr = errno;
    const char *linuxErrMsg = strerror(errno);
    sError("walWriteWithSyncInfo error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg);
M
Minghao Li 已提交
66 67 68
    ASSERT(0);
  }    
  //assert(code == 0);
M
Minghao Li 已提交
69

M
Minghao Li 已提交
70
  walFsync(pWal, true);
M
Minghao Li 已提交
71
  return code;
M
Minghao Li 已提交
72
}
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75 76 77
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
78 79
  if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
M
Minghao Li 已提交
80 81 82 83
    int32_t code = walReadWithHandle(pWalHandle, index);
    if (code != 0) {
      int32_t err = terrno;
      const char *errStr = tstrerror(err);
84 85 86
      int32_t linuxErr = errno;
      const char *linuxErrMsg = strerror(errno);
      sError("walReadWithHandle error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg);
M
Minghao Li 已提交
87 88 89
      ASSERT(0);
    }    
    //assert(walReadWithHandle(pWalHandle, index) == 0);
M
Minghao Li 已提交
90

L
Liu Jicong 已提交
91
    SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
92 93
    assert(pEntry != NULL);

M
Minghao Li 已提交
94 95 96 97 98 99
    pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
    pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
    pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
    pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
    pEntry->term = pWalHandle->pHead->head.syncMeta.term;
    pEntry->index = index;
L
Liu Jicong 已提交
100 101
    assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
    memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
102

M
Minghao Li 已提交
103 104
    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);
M
Minghao Li 已提交
105
    return pEntry;
M
Minghao Li 已提交
106

M
Minghao Li 已提交
107 108 109
  } else {
    return NULL;
  }
M
Minghao Li 已提交
110
}
M
Minghao Li 已提交
111

M
Minghao Li 已提交
112 113 114
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
115 116 117 118 119
  //assert(walRollback(pWal, fromIndex) == 0);
  int32_t code = walRollback(pWal, fromIndex);
  if (code != 0) {
    int32_t err = terrno;
    const char *errStr = tstrerror(err);
120 121 122
    int32_t linuxErr = errno;
    const char *linuxErrMsg = strerror(errno);
    sError("walRollback error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg);
M
Minghao Li 已提交
123 124
    ASSERT(0);
  } 
M
Minghao Li 已提交
125
  return 0;  // to avoid compiler error
M
Minghao Li 已提交
126
}
M
Minghao Li 已提交
127

M
Minghao Li 已提交
128
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
129 130
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
131
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
132 133
  return lastIndex;
}
M
Minghao Li 已提交
134

M
Minghao Li 已提交
135
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
136
  SyncTerm        lastTerm = 0;
M
Minghao Li 已提交
137
  SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
M
Minghao Li 已提交
138 139
  if (pLastEntry != NULL) {
    lastTerm = pLastEntry->term;
wafwerar's avatar
wafwerar 已提交
140
    taosMemoryFree(pLastEntry);
M
Minghao Li 已提交
141
  }
M
Minghao Li 已提交
142 143
  return lastTerm;
}
M
Minghao Li 已提交
144

M
Minghao Li 已提交
145 146 147
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
148 149 150 151 152
  //assert(walCommit(pWal, index) == 0);
  int32_t code = walCommit(pWal, index);
  if (code != 0) {
    int32_t err = terrno;
    const char *errStr = tstrerror(err);
153 154 155
    int32_t linuxErr = errno;
    const char *linuxErrMsg = strerror(errno);
    sError("walCommit error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg);
M
Minghao Li 已提交
156 157
    ASSERT(0);
  } 
M
Minghao Li 已提交
158
  return 0;  // to avoid compiler error
M
Minghao Li 已提交
159
}
M
Minghao Li 已提交
160

M
Minghao Li 已提交
161 162 163 164 165 166 167 168 169
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
170 171 172 173 174

  SSyncRaftEntry* pEntry = NULL;
  if (lastIndex > 0) {
    pEntry = logStoreGetEntry(pLogStore, lastIndex);
  }
M
Minghao Li 已提交
175 176
  return pEntry;
}
M
Minghao Li 已提交
177

M
Minghao Li 已提交
178
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
179
  char               u64buf[128];
M
Minghao Li 已提交
180 181
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);

    cJSON* pEntries = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
    SyncIndex lastIndex = logStoreLastIndex(pLogStore);
    for (SyncIndex i = 0; i <= lastIndex; ++i) {
      SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
      cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
      syncEntryDestory(pEntry);
    }
M
Minghao Li 已提交
201 202 203 204 205 206
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
  return pJson;
}
M
Minghao Li 已提交
207 208 209 210 211 212

char* logStore2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStore2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
213 214
}

M
Minghao Li 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
  char               u64buf[128];
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot);
  return pJson;
}

char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStoreSimple2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
243
// for debug -----------------
M
Minghao Li 已提交
244
void logStorePrint(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
245 246 247
  char* serialized = logStore2Str(pLogStore);
  printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
248
  taosMemoryFree(serialized);
M
Minghao Li 已提交
249 250 251 252
}

void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
253
  printf("logStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
254
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
255
  taosMemoryFree(serialized);
M
Minghao Li 已提交
256
}
M
Minghao Li 已提交
257

M
Minghao Li 已提交
258 259
void logStoreLog(SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
260
  sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
261
  taosMemoryFree(serialized);
M
Minghao Li 已提交
262 263 264 265
}

void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
266
  sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
267
  taosMemoryFree(serialized);
268
}
M
Minghao Li 已提交
269 270 271 272 273 274

// for debug -----------------
void logStoreSimplePrint(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
275
  taosMemoryFree(serialized);
M
Minghao Li 已提交
276 277 278 279 280 281
}

void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
282
  taosMemoryFree(serialized);
M
Minghao Li 已提交
283 284 285 286 287
}

void logStoreSimpleLog(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
288
  taosMemoryFree(serialized);
M
Minghao Li 已提交
289 290 291 292 293
}

void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
294
  taosMemoryFree(serialized);
L
Liu Jicong 已提交
295
}