syncRaftLog.c 13.6 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftLog.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
19
#include "syncRaftStore.h"
S
Shengliang Guan 已提交
20
#include "syncUtil.h"
M
Minghao Li 已提交
21

22 23 24
// log[m .. n]

// public function
25
static int32_t   raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
M
Minghao Li 已提交
26 27
static int32_t   raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t   raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
28
static bool      raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
M
Minghao Li 已提交
29 30
static int32_t   raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore);
31
static int32_t   raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
M
Minghao Li 已提交
32

33
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
34 35 36 37 38
  SSyncLogStore* pLogStore = taosMemoryCalloc(1, sizeof(SSyncLogStore));
  if (pLogStore == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
39

40
  // pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
41
  pLogStore->pCache = taosLRUCacheInit(30 * 1024 * 1024, 1, .5);
42 43
  if (pLogStore->pCache == NULL) {
    taosMemoryFree(pLogStore);
44
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
45 46 47
    return NULL;
  }

48 49 50
  pLogStore->cacheHit = 0;
  pLogStore->cacheMiss = 0;

51 52
  taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);

53 54 55 56 57 58 59 60 61
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
  ASSERT(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;
  ASSERT(pData->pWal != NULL);

  taosThreadMutexInit(&(pData->mutex), NULL);
L
Liu Jicong 已提交
62
  pData->pWalHandle = walOpenReader(pData->pWal, NULL);
63 64
  ASSERT(pData->pWalHandle != NULL);

M
Minghao Li 已提交
65 66
  pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;
  pLogStore->syncLogCommitIndex = raftlogCommitIndex;
67 68 69 70 71 72 73 74 75 76 77
  pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot;
  pLogStore->syncLogBeginIndex = raftLogBeginIndex;
  pLogStore->syncLogEndIndex = raftLogEndIndex;
  pLogStore->syncLogIsEmpty = raftLogIsEmpty;
  pLogStore->syncLogEntryCount = raftLogEntryCount;
  pLogStore->syncLogLastIndex = raftLogLastIndex;
  pLogStore->syncLogLastTerm = raftLogLastTerm;
  pLogStore->syncLogAppendEntry = raftLogAppendEntry;
  pLogStore->syncLogGetEntry = raftLogGetEntry;
  pLogStore->syncLogTruncate = raftLogTruncate;
  pLogStore->syncLogWriteIndex = raftLogWriteIndex;
78
  pLogStore->syncLogExist = raftLogExist;
79 80 81 82 83 84 85 86 87 88

  return pLogStore;
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
    SSyncLogStoreData* pData = pLogStore->data;

    taosThreadMutexLock(&(pData->mutex));
    if (pData->pWalHandle != NULL) {
L
Liu Jicong 已提交
89
      walCloseReader(pData->pWalHandle);
90 91 92 93 94 95
      pData->pWalHandle = NULL;
    }
    taosThreadMutexUnlock(&(pData->mutex));
    taosThreadMutexDestroy(&(pData->mutex));

    taosMemoryFree(pLogStore->data);
96 97 98 99

    taosLRUCacheEraseUnrefEntries(pLogStore->pCache);
    taosLRUCacheCleanup(pLogStore->pCache);

100 101 102 103 104
    taosMemoryFree(pLogStore);
  }
}

// log[m .. n]
105 106
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
  ASSERT(snapshotIndex >= 0);
M
Minghao Li 已提交
107 108 109

  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
110 111 112 113 114 115
  int32_t            code = walRestoreFromSnapshot(pWal, snapshotIndex);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
116

S
Shengliang Guan 已提交
117 118 119
    sNError(pData->pSyncNode,
            "wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
            snapshotIndex, err, err, errStr, sysErr, sysErrStr);
120
    return -1;
121
  }
122 123 124

  return 0;
}
M
Minghao Li 已提交
125

126
SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
127 128
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
129 130
  SyncIndex          firstVer = walGetFirstVer(pWal);
  return firstVer;
M
Minghao Li 已提交
131 132
}

133
SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); }
M
Minghao Li 已提交
134

135
bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
136 137 138
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walIsEmpty(pWal);
M
Minghao Li 已提交
139 140
}

141
int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
142 143
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
M
Minghao Li 已提交
144
  int32_t   count = endIndex - beginIndex + 1;
M
Minghao Li 已提交
145 146 147
  return count > 0 ? count : 0;
}

148
SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
149
  SyncIndex          lastIndex;
M
Minghao Li 已提交
150 151 152
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
M
Minghao Li 已提交
153

154
  return lastVer;
M
Minghao Li 已提交
155 156
}

157
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
158 159 160 161
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
  return lastVer + 1;
M
Minghao Li 已提交
162 163
}

164 165 166 167 168 169 170
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  bool               b = walLogExist(pWal, index);
  return b;
}

171 172 173
// if success, return last term
// if not log, return 0
// if error, return SYNC_TERM_INVALID
174
SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
175 176 177 178 179 180 181
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  if (walIsEmpty(pWal)) {
    return 0;
  } else {
    SSyncRaftEntry* pLastEntry;
    int32_t         code = raftLogGetLastEntry(pLogStore, &pLastEntry);
182 183 184 185 186 187 188
    if (code == 0 && pLastEntry != NULL) {
      SyncTerm lastTerm = pLastEntry->term;
      taosMemoryFree(pLastEntry);
      return lastTerm;
    } else {
      return SYNC_TERM_INVALID;
    }
189 190
  }

191 192
  // can not be here!
  return SYNC_TERM_INVALID;
193 194
}

M
Minghao Li 已提交
195 196 197 198 199
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

  SyncIndex    index = 0;
S
Shengliang Guan 已提交
200
  SWalSyncInfo syncMeta = {0};
M
Minghao Li 已提交
201 202 203
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
204

M
Minghao Li 已提交
205
  int64_t tsWriteBegin = taosGetTimestampNs();
M
Minghao Li 已提交
206
  index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
M
Minghao Li 已提交
207
  int64_t tsWriteEnd = taosGetTimestampNs();
208 209
  int64_t tsElapsed = tsWriteEnd - tsWriteBegin;

M
Minghao Li 已提交
210 211 212 213 214 215
  if (index < 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);

S
Shengliang Guan 已提交
216 217
    sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
            pEntry->index, err, err, errStr, sysErr, sysErrStr);
M
Minghao Li 已提交
218 219 220 221
    return -1;
  }
  pEntry->index = index;

222 223
  sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
          TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
M
Minghao Li 已提交
224 225 226
  return 0;
}

227 228 229
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
230
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
231 232
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
233
  int32_t            code = 0;
234 235 236

  *ppEntry = NULL;

237
  // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
L
Liu Jicong 已提交
238
  SWalReader* pWalHandle = pData->pWalHandle;
239
  if (pWalHandle == NULL) {
240
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
241 242
    sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId);

243 244 245
    return -1;
  }

M
Minghao Li 已提交
246
  int64_t ts1 = taosGetTimestampNs();
247 248
  taosThreadMutexLock(&(pData->mutex));

M
Minghao Li 已提交
249
  int64_t ts2 = taosGetTimestampNs();
L
Liu Jicong 已提交
250
  code = walReadVer(pWalHandle, index);
M
Minghao Li 已提交
251
  int64_t ts3 = taosGetTimestampNs();
252

253
  // code = walReadVerCached(pWalHandle, index);
254 255 256
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
257 258
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
259

S
Shengliang Guan 已提交
260 261 262 263 264 265 266
    if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
      sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
              err, err, errStr, sysErr, sysErrStr);
    } else {
      sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
              err, err, errStr, sysErr, sysErrStr);
    }
267

268 269 270 271 272
    /*
        int32_t saveErr = terrno;
        walCloseReadHandle(pWalHandle);
        terrno = saveErr;
    */
M
Minghao Li 已提交
273

274
    taosThreadMutexUnlock(&(pData->mutex));
275 276 277 278 279 280 281 282 283 284 285 286 287 288
    return code;
  }

  *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
  ASSERT(*ppEntry != NULL);
  (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
  (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
  (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
  (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
  (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
  (*ppEntry)->index = index;
  ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
  memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);

289 290 291 292 293
  /*
    int32_t saveErr = terrno;
    walCloseReadHandle(pWalHandle);
    terrno = saveErr;
  */
294

295
  taosThreadMutexUnlock(&(pData->mutex));
M
Minghao Li 已提交
296
  int64_t ts4 = taosGetTimestampNs();
M
Minghao Li 已提交
297 298 299 300 301 302 303 304 305 306 307

  int64_t tsElapsed = ts4 - ts1;
  int64_t tsElapsedLock = ts2 - ts1;
  int64_t tsElapsedRead = ts3 - ts2;
  int64_t tsElapsedBuild = ts4 - ts3;

  sNTrace(pData->pSyncNode,
          "read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64
          ", elapsed-build:%" PRId64,
          index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);

308 309
  return code;
}
M
Minghao Li 已提交
310

311
// truncate semantic
M
Minghao Li 已提交
312 313 314
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
315 316 317 318 319 320 321

  // need not truncate
  SyncIndex wallastVer = walGetLastVer(pWal);
  if (fromIndex > wallastVer) {
    return 0;
  }

322 323 324 325 326 327
  // need not truncate
  SyncIndex walCommitVer = walGetCommittedVer(pWal);
  if (fromIndex <= walCommitVer) {
    return 0;
  }

328 329 330 331 332 333 334 335 336 337 338
  // delete from cache
  for (SyncIndex index = fromIndex; index <= wallastVer; ++index) {
    SLRUCache* pCache = pData->pSyncNode->pLogStore->pCache;
    LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
    if (h) {
      sNTrace(pData->pSyncNode, "cache delete index:%" PRId64, index);

      taosLRUCacheRelease(pData->pSyncNode->pLogStore->pCache, h, true);
    }
  }

339
  int32_t code = walRollback(pWal, fromIndex);
M
Minghao Li 已提交
340 341 342
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
343 344
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
S
Shengliang Guan 已提交
345
    sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
346 347
           pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr);

M
Minghao Li 已提交
348
    // ASSERT(0);
M
Minghao Li 已提交
349
  }
M
Minghao Li 已提交
350 351

  // event log
S
Shengliang Guan 已提交
352
  sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex);
M
Minghao Li 已提交
353 354 355
  return code;
}

356 357 358
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
359 360 361 362 363 364 365 366 367 368 369
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  ASSERT(ppLastEntry != NULL);

  *ppLastEntry = NULL;
  if (walIsEmpty(pWal)) {
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  } else {
    SyncIndex lastIndex = raftLogLastIndex(pLogStore);
370 371
    ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
    int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
372 373 374 375 376 377
    return code;
  }

  return -1;
}

M
Minghao Li 已提交
378
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
379 380
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
381 382 383 384 385 386 387 388 389 390 391

  // need not update
  SyncIndex snapshotVer = walGetSnapshotVer(pWal);
  SyncIndex walCommitVer = walGetCommittedVer(pWal);
  SyncIndex wallastVer = walGetLastVer(pWal);

  if (index < snapshotVer || index > wallastVer) {
    // ignore
    return 0;
  }

392 393 394 395
  int32_t code = walCommit(pWal, index);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
396 397
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
S
Shengliang Guan 已提交
398
    sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
399 400
           pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr);

401
    ASSERT(0);
M
Minghao Li 已提交
402
    return -1;
403
  }
M
Minghao Li 已提交
404
  return 0;
M
Minghao Li 已提交
405
}
M
Minghao Li 已提交
406

M
Minghao Li 已提交
407
SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
408 409 410 411
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

M
Minghao Li 已提交
412 413 414 415
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walGetFirstVer(pWal);
416 417 418 419 420 421
}

SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walGetCommittedVer(pWal);
M
Minghao Li 已提交
422
}