syncRaftLog.c 9.9 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftLog.h"
M
Minghao Li 已提交
17
#include "wal.h"
M
Minghao Li 已提交
18

M
Minghao Li 已提交
19
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
wafwerar's avatar
wafwerar 已提交
20
  SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
M
Minghao Li 已提交
21
  assert(pLogStore != NULL);
M
Minghao Li 已提交
22

wafwerar's avatar
wafwerar 已提交
23
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36
  assert(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;

  pLogStore->appendEntry = logStoreAppendEntry;
  pLogStore->getEntry = logStoreGetEntry;
  pLogStore->truncate = logStoreTruncate;
  pLogStore->getLastIndex = logStoreLastIndex;
  pLogStore->getLastTerm = logStoreLastTerm;
  pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
  pLogStore->getCommitIndex = logStoreGetCommitIndex;
M
Minghao Li 已提交
37
  return pLogStore;
M
Minghao Li 已提交
38 39 40 41
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
wafwerar's avatar
wafwerar 已提交
42 43
    taosMemoryFree(pLogStore->data);
    taosMemoryFree(pLogStore);
M
Minghao Li 已提交
44 45 46
  }
}

M
Minghao Li 已提交
47 48 49 50
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
51 52
  SyncIndex lastIndex = logStoreLastIndex(pLogStore);
  assert(pEntry->index == lastIndex + 1);
M
Minghao Li 已提交
53

M
Minghao Li 已提交
54 55 56 57 58 59
  int          code = 0;
  SSyncLogMeta syncMeta;
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
  code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
M
Minghao Li 已提交
60 61 62
  if (code != 0) {
    int32_t err = terrno;
    const char *errStr = tstrerror(err);
M
Minghao Li 已提交
63
    sError("walWriteWithSyncInfo error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno));
M
Minghao Li 已提交
64 65 66
    ASSERT(0);
  }    
  //assert(code == 0);
M
Minghao Li 已提交
67

M
Minghao Li 已提交
68
  walFsync(pWal, true);
M
Minghao Li 已提交
69
  return code;
M
Minghao Li 已提交
70
}
M
Minghao Li 已提交
71

M
Minghao Li 已提交
72 73 74 75
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
76 77
  if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
M
Minghao Li 已提交
78 79 80 81
    int32_t code = walReadWithHandle(pWalHandle, index);
    if (code != 0) {
      int32_t err = terrno;
      const char *errStr = tstrerror(err);
M
Minghao Li 已提交
82
      sError("walReadWithHandle error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno));
M
Minghao Li 已提交
83 84 85
      ASSERT(0);
    }    
    //assert(walReadWithHandle(pWalHandle, index) == 0);
M
Minghao Li 已提交
86

L
Liu Jicong 已提交
87
    SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
88 89
    assert(pEntry != NULL);

M
Minghao Li 已提交
90 91 92 93 94 95
    pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
    pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
    pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
    pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
    pEntry->term = pWalHandle->pHead->head.syncMeta.term;
    pEntry->index = index;
L
Liu Jicong 已提交
96 97
    assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
    memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
98

M
Minghao Li 已提交
99 100
    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);
M
Minghao Li 已提交
101
    return pEntry;
M
Minghao Li 已提交
102

M
Minghao Li 已提交
103 104 105
  } else {
    return NULL;
  }
M
Minghao Li 已提交
106
}
M
Minghao Li 已提交
107

M
Minghao Li 已提交
108 109 110
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
111 112 113 114 115 116 117 118
  //assert(walRollback(pWal, fromIndex) == 0);
  int32_t code = walRollback(pWal, fromIndex);
  if (code != 0) {
    int32_t err = terrno;
    const char *errStr = tstrerror(err);
    sError("walRollback error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno));
    ASSERT(0);
  } 
M
Minghao Li 已提交
119
  return 0;  // to avoid compiler error
M
Minghao Li 已提交
120
}
M
Minghao Li 已提交
121

M
Minghao Li 已提交
122
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
123 124
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
125
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
126 127
  return lastIndex;
}
M
Minghao Li 已提交
128

M
Minghao Li 已提交
129
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
130
  SyncTerm        lastTerm = 0;
M
Minghao Li 已提交
131
  SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
M
Minghao Li 已提交
132 133
  if (pLastEntry != NULL) {
    lastTerm = pLastEntry->term;
wafwerar's avatar
wafwerar 已提交
134
    taosMemoryFree(pLastEntry);
M
Minghao Li 已提交
135
  }
M
Minghao Li 已提交
136 137
  return lastTerm;
}
M
Minghao Li 已提交
138

M
Minghao Li 已提交
139 140 141
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
142 143 144 145 146 147 148 149
  //assert(walCommit(pWal, index) == 0);
  int32_t code = walCommit(pWal, index);
  if (code != 0) {
    int32_t err = terrno;
    const char *errStr = tstrerror(err);
    sError("walCommit error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno));
    ASSERT(0);
  } 
M
Minghao Li 已提交
150
  return 0;  // to avoid compiler error
M
Minghao Li 已提交
151
}
M
Minghao Li 已提交
152

M
Minghao Li 已提交
153 154 155 156 157 158 159 160 161
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
162 163 164 165 166

  SSyncRaftEntry* pEntry = NULL;
  if (lastIndex > 0) {
    pEntry = logStoreGetEntry(pLogStore, lastIndex);
  }
M
Minghao Li 已提交
167 168
  return pEntry;
}
M
Minghao Li 已提交
169

M
Minghao Li 已提交
170
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
171
  char               u64buf[128];
M
Minghao Li 已提交
172 173
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);

    cJSON* pEntries = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
    SyncIndex lastIndex = logStoreLastIndex(pLogStore);
    for (SyncIndex i = 0; i <= lastIndex; ++i) {
      SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
      cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
      syncEntryDestory(pEntry);
    }
M
Minghao Li 已提交
193 194 195 196 197 198
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
  return pJson;
}
M
Minghao Li 已提交
199 200 201 202 203 204

char* logStore2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStore2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
205 206
}

M
Minghao Li 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
  char               u64buf[128];
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot);
  return pJson;
}

char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStoreSimple2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
235
// for debug -----------------
M
Minghao Li 已提交
236
void logStorePrint(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
237 238 239
  char* serialized = logStore2Str(pLogStore);
  printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
240
  taosMemoryFree(serialized);
M
Minghao Li 已提交
241 242 243 244
}

void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
245
  printf("logStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
246
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
247
  taosMemoryFree(serialized);
M
Minghao Li 已提交
248
}
M
Minghao Li 已提交
249

M
Minghao Li 已提交
250 251
void logStoreLog(SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
252
  sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
253
  taosMemoryFree(serialized);
M
Minghao Li 已提交
254 255 256 257
}

void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
258
  sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
259
  taosMemoryFree(serialized);
260
}
M
Minghao Li 已提交
261 262 263 264 265 266

// for debug -----------------
void logStoreSimplePrint(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
267
  taosMemoryFree(serialized);
M
Minghao Li 已提交
268 269 270 271 272 273
}

void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
274
  taosMemoryFree(serialized);
M
Minghao Li 已提交
275 276 277 278 279
}

void logStoreSimpleLog(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
280
  taosMemoryFree(serialized);
M
Minghao Li 已提交
281 282 283 284 285
}

void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
286
  taosMemoryFree(serialized);
L
Liu Jicong 已提交
287
}