syncRaftLog.c 13.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftLog.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
19
#include "syncRaftStore.h"
S
Shengliang Guan 已提交
20
#include "syncUtil.h"
M
Minghao Li 已提交
21

22 23 24
// log[m .. n]

// public function
25
static int32_t   raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
M
Minghao Li 已提交
26 27
static int32_t   raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t   raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
28
static bool      raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
M
Minghao Li 已提交
29 30
static int32_t   raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore);
31
static int32_t   raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
M
Minghao Li 已提交
32

33
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
34 35 36 37 38
  SSyncLogStore* pLogStore = taosMemoryCalloc(1, sizeof(SSyncLogStore));
  if (pLogStore == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
39

40
  // pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
41
  pLogStore->pCache = taosLRUCacheInit(30 * 1024 * 1024, 1, .5);
42 43
  if (pLogStore->pCache == NULL) {
    taosMemoryFree(pLogStore);
44
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
45 46 47 48 49
    return NULL;
  }

  taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);

50 51 52 53 54 55 56 57 58
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
  ASSERT(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;
  ASSERT(pData->pWal != NULL);

  taosThreadMutexInit(&(pData->mutex), NULL);
L
Liu Jicong 已提交
59
  pData->pWalHandle = walOpenReader(pData->pWal, NULL);
60 61
  ASSERT(pData->pWalHandle != NULL);

M
Minghao Li 已提交
62 63
  pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;
  pLogStore->syncLogCommitIndex = raftlogCommitIndex;
64 65 66 67 68 69 70 71 72 73 74
  pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot;
  pLogStore->syncLogBeginIndex = raftLogBeginIndex;
  pLogStore->syncLogEndIndex = raftLogEndIndex;
  pLogStore->syncLogIsEmpty = raftLogIsEmpty;
  pLogStore->syncLogEntryCount = raftLogEntryCount;
  pLogStore->syncLogLastIndex = raftLogLastIndex;
  pLogStore->syncLogLastTerm = raftLogLastTerm;
  pLogStore->syncLogAppendEntry = raftLogAppendEntry;
  pLogStore->syncLogGetEntry = raftLogGetEntry;
  pLogStore->syncLogTruncate = raftLogTruncate;
  pLogStore->syncLogWriteIndex = raftLogWriteIndex;
75
  pLogStore->syncLogExist = raftLogExist;
76 77 78 79 80 81 82 83 84 85

  return pLogStore;
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
    SSyncLogStoreData* pData = pLogStore->data;

    taosThreadMutexLock(&(pData->mutex));
    if (pData->pWalHandle != NULL) {
L
Liu Jicong 已提交
86
      walCloseReader(pData->pWalHandle);
87 88 89 90 91 92
      pData->pWalHandle = NULL;
    }
    taosThreadMutexUnlock(&(pData->mutex));
    taosThreadMutexDestroy(&(pData->mutex));

    taosMemoryFree(pLogStore->data);
93 94 95 96

    taosLRUCacheEraseUnrefEntries(pLogStore->pCache);
    taosLRUCacheCleanup(pLogStore->pCache);

97 98 99 100 101
    taosMemoryFree(pLogStore);
  }
}

// log[m .. n]
102 103
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
  ASSERT(snapshotIndex >= 0);
M
Minghao Li 已提交
104 105 106

  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
107 108 109 110 111 112
  int32_t            code = walRestoreFromSnapshot(pWal, snapshotIndex);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
113

S
Shengliang Guan 已提交
114 115 116
    sNError(pData->pSyncNode,
            "wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
            snapshotIndex, err, err, errStr, sysErr, sysErrStr);
117
    return -1;
118
  }
119 120 121

  return 0;
}
M
Minghao Li 已提交
122

123
SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
124 125
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
126 127
  SyncIndex          firstVer = walGetFirstVer(pWal);
  return firstVer;
M
Minghao Li 已提交
128 129
}

130
SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); }
M
Minghao Li 已提交
131

132
bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
133 134 135
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walIsEmpty(pWal);
M
Minghao Li 已提交
136 137
}

138
int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
139 140
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
M
Minghao Li 已提交
141
  int32_t   count = endIndex - beginIndex + 1;
M
Minghao Li 已提交
142 143 144
  return count > 0 ? count : 0;
}

145
SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
146
  SyncIndex          lastIndex;
M
Minghao Li 已提交
147 148 149
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
M
Minghao Li 已提交
150

151
  return lastVer;
M
Minghao Li 已提交
152 153
}

154
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
155 156 157 158
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
  return lastVer + 1;
M
Minghao Li 已提交
159 160
}

161 162 163 164 165 166 167
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  bool               b = walLogExist(pWal, index);
  return b;
}

168 169 170
// if success, return last term
// if not log, return 0
// if error, return SYNC_TERM_INVALID
171
SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
172 173 174 175 176 177 178
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  if (walIsEmpty(pWal)) {
    return 0;
  } else {
    SSyncRaftEntry* pLastEntry;
    int32_t         code = raftLogGetLastEntry(pLogStore, &pLastEntry);
179 180 181 182 183 184 185
    if (code == 0 && pLastEntry != NULL) {
      SyncTerm lastTerm = pLastEntry->term;
      taosMemoryFree(pLastEntry);
      return lastTerm;
    } else {
      return SYNC_TERM_INVALID;
    }
186 187
  }

188 189
  // can not be here!
  return SYNC_TERM_INVALID;
190 191
}

M
Minghao Li 已提交
192 193 194 195 196
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

  SyncIndex    index = 0;
S
Shengliang Guan 已提交
197
  SWalSyncInfo syncMeta = {0};
M
Minghao Li 已提交
198 199 200
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
201

M
Minghao Li 已提交
202
  int64_t tsWriteBegin = taosGetTimestampNs();
M
Minghao Li 已提交
203
  index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
M
Minghao Li 已提交
204
  int64_t tsWriteEnd = taosGetTimestampNs();
205 206
  int64_t tsElapsed = tsWriteEnd - tsWriteBegin;

M
Minghao Li 已提交
207 208 209 210 211 212
  if (index < 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);

S
Shengliang Guan 已提交
213 214
    sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
            pEntry->index, err, err, errStr, sysErr, sysErrStr);
M
Minghao Li 已提交
215 216 217 218
    return -1;
  }
  pEntry->index = index;

219 220
  sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
          TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
M
Minghao Li 已提交
221 222 223
  return 0;
}

224 225 226
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
227
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
228 229
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
230
  int32_t            code = 0;
231 232 233

  *ppEntry = NULL;

234
  // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
L
Liu Jicong 已提交
235
  SWalReader* pWalHandle = pData->pWalHandle;
236
  if (pWalHandle == NULL) {
237
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
238 239
    sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId);

240 241 242
    return -1;
  }

M
Minghao Li 已提交
243
  int64_t ts1 = taosGetTimestampNs();
244 245
  taosThreadMutexLock(&(pData->mutex));

M
Minghao Li 已提交
246
  int64_t ts2 = taosGetTimestampNs();
L
Liu Jicong 已提交
247
  code = walReadVer(pWalHandle, index);
M
Minghao Li 已提交
248
  int64_t ts3 = taosGetTimestampNs();
249

250
  // code = walReadVerCached(pWalHandle, index);
251 252 253
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
254 255
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
256

S
Shengliang Guan 已提交
257 258 259 260 261 262 263
    if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
      sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
              err, err, errStr, sysErr, sysErrStr);
    } else {
      sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
              err, err, errStr, sysErr, sysErrStr);
    }
264

265 266 267 268 269
    /*
        int32_t saveErr = terrno;
        walCloseReadHandle(pWalHandle);
        terrno = saveErr;
    */
M
Minghao Li 已提交
270

271
    taosThreadMutexUnlock(&(pData->mutex));
272 273 274 275 276 277 278 279 280 281 282 283 284 285
    return code;
  }

  *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
  ASSERT(*ppEntry != NULL);
  (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
  (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
  (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
  (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
  (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
  (*ppEntry)->index = index;
  ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
  memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);

286 287 288 289 290
  /*
    int32_t saveErr = terrno;
    walCloseReadHandle(pWalHandle);
    terrno = saveErr;
  */
291

292
  taosThreadMutexUnlock(&(pData->mutex));
M
Minghao Li 已提交
293
  int64_t ts4 = taosGetTimestampNs();
M
Minghao Li 已提交
294 295 296 297 298 299 300 301 302 303 304

  int64_t tsElapsed = ts4 - ts1;
  int64_t tsElapsedLock = ts2 - ts1;
  int64_t tsElapsedRead = ts3 - ts2;
  int64_t tsElapsedBuild = ts4 - ts3;

  sNTrace(pData->pSyncNode,
          "read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64
          ", elapsed-build:%" PRId64,
          index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);

305 306
  return code;
}
M
Minghao Li 已提交
307

308
// truncate semantic
M
Minghao Li 已提交
309 310 311
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
312 313 314 315 316 317 318

  // need not truncate
  SyncIndex wallastVer = walGetLastVer(pWal);
  if (fromIndex > wallastVer) {
    return 0;
  }

319 320 321 322 323 324
  // need not truncate
  SyncIndex walCommitVer = walGetCommittedVer(pWal);
  if (fromIndex <= walCommitVer) {
    return 0;
  }

325 326 327
  // delete from cache
  for (SyncIndex index = fromIndex; index <= wallastVer; ++index) {
    SLRUCache* pCache = pData->pSyncNode->pLogStore->pCache;
328 329
    taosLRUCacheErase(pData->pSyncNode->pLogStore->pCache, &index, sizeof(index));
#if 0  
330 331 332 333 334 335
    LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
    if (h) {
      sNTrace(pData->pSyncNode, "cache delete index:%" PRId64, index);

      taosLRUCacheRelease(pData->pSyncNode->pLogStore->pCache, h, true);
    }
336
#endif
337 338
  }

339
  int32_t code = walRollback(pWal, fromIndex);
M
Minghao Li 已提交
340 341 342
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
343 344
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
S
Shengliang Guan 已提交
345
    sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
346 347
           pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr);

M
Minghao Li 已提交
348
    // ASSERT(0);
M
Minghao Li 已提交
349
  }
M
Minghao Li 已提交
350 351

  // event log
S
Shengliang Guan 已提交
352
  sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex);
M
Minghao Li 已提交
353 354 355
  return code;
}

356 357 358
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
359 360 361 362 363 364 365 366 367 368 369
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  ASSERT(ppLastEntry != NULL);

  *ppLastEntry = NULL;
  if (walIsEmpty(pWal)) {
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  } else {
    SyncIndex lastIndex = raftLogLastIndex(pLogStore);
370 371
    ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
    int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
372 373 374 375 376 377
    return code;
  }

  return -1;
}

M
Minghao Li 已提交
378
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
379 380
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
381
  // ASSERT(walCommit(pWal, index) == 0);
382 383 384 385
  int32_t code = walCommit(pWal, index);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
386 387
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
S
Shengliang Guan 已提交
388
    sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
389 390
           pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr);

391
    ASSERT(0);
M
Minghao Li 已提交
392
    return -1;
393
  }
M
Minghao Li 已提交
394
  return 0;
M
Minghao Li 已提交
395
}
M
Minghao Li 已提交
396

M
Minghao Li 已提交
397
SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
398 399 400 401
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

M
Minghao Li 已提交
402 403 404 405
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walGetFirstVer(pWal);
406 407 408 409 410 411
}

SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walGetCommittedVer(pWal);
M
Minghao Li 已提交
412
}