syncRaftLog.c 13.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftLog.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
19
#include "syncRaftStore.h"
S
Shengliang Guan 已提交
20
#include "syncUtil.h"
M
Minghao Li 已提交
21

22 23 24
// log[m .. n]

// public function
25
static int32_t   raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
26
static int32_t   raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync);
M
Minghao Li 已提交
27
static int32_t   raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
28
static bool      raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
M
Minghao Li 已提交
29 30
static int32_t   raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore);
31
static int32_t   raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
M
Minghao Li 已提交
32

33
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
34 35 36 37 38
  SSyncLogStore* pLogStore = taosMemoryCalloc(1, sizeof(SSyncLogStore));
  if (pLogStore == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
39

40
  // pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
41
  pLogStore->pCache = taosLRUCacheInit(30 * 1024 * 1024, 1, .5);
42 43
  if (pLogStore->pCache == NULL) {
    taosMemoryFree(pLogStore);
S
Shengliang Guan 已提交
44
    terrno = TSDB_CODE_OUT_OF_MEMORY;
45 46 47
    return NULL;
  }

48 49 50
  pLogStore->cacheHit = 0;
  pLogStore->cacheMiss = 0;

51 52
  taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);

53
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
54
  ASSERT(pLogStore->data != NULL);
55 56 57 58

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;
59
  ASSERT(pData->pWal != NULL);
60 61

  taosThreadMutexInit(&(pData->mutex), NULL);
L
Liu Jicong 已提交
62
  pData->pWalHandle = walOpenReader(pData->pWal, NULL);
63
  ASSERT(pData->pWalHandle != NULL);
64

M
Minghao Li 已提交
65 66
  pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;
  pLogStore->syncLogCommitIndex = raftlogCommitIndex;
67 68 69 70 71 72 73 74 75 76 77
  pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot;
  pLogStore->syncLogBeginIndex = raftLogBeginIndex;
  pLogStore->syncLogEndIndex = raftLogEndIndex;
  pLogStore->syncLogIsEmpty = raftLogIsEmpty;
  pLogStore->syncLogEntryCount = raftLogEntryCount;
  pLogStore->syncLogLastIndex = raftLogLastIndex;
  pLogStore->syncLogLastTerm = raftLogLastTerm;
  pLogStore->syncLogAppendEntry = raftLogAppendEntry;
  pLogStore->syncLogGetEntry = raftLogGetEntry;
  pLogStore->syncLogTruncate = raftLogTruncate;
  pLogStore->syncLogWriteIndex = raftLogWriteIndex;
78
  pLogStore->syncLogExist = raftLogExist;
79 80 81 82 83 84 85 86 87 88

  return pLogStore;
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
    SSyncLogStoreData* pData = pLogStore->data;

    taosThreadMutexLock(&(pData->mutex));
    if (pData->pWalHandle != NULL) {
L
Liu Jicong 已提交
89
      walCloseReader(pData->pWalHandle);
90 91 92 93 94 95
      pData->pWalHandle = NULL;
    }
    taosThreadMutexUnlock(&(pData->mutex));
    taosThreadMutexDestroy(&(pData->mutex));

    taosMemoryFree(pLogStore->data);
96 97 98 99

    taosLRUCacheEraseUnrefEntries(pLogStore->pCache);
    taosLRUCacheCleanup(pLogStore->pCache);

100 101 102 103 104
    taosMemoryFree(pLogStore);
  }
}

// log[m .. n]
105
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
106
  ASSERT(snapshotIndex >= 0);
M
Minghao Li 已提交
107 108 109

  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
110 111 112 113 114 115
  int32_t            code = walRestoreFromSnapshot(pWal, snapshotIndex);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
116

S
Shengliang Guan 已提交
117
    sNError(pData->pSyncNode,
118 119
            "wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex,
            err, errStr, sysErr, sysErrStr);
120
    return -1;
121
  }
122 123 124

  return 0;
}
M
Minghao Li 已提交
125

126
SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
127 128
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
129 130
  SyncIndex          firstVer = walGetFirstVer(pWal);
  return firstVer;
M
Minghao Li 已提交
131 132
}

133
SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); }
M
Minghao Li 已提交
134

135
bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
136 137 138
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walIsEmpty(pWal);
M
Minghao Li 已提交
139 140
}

141
int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
142 143
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
M
Minghao Li 已提交
144
  int32_t   count = endIndex - beginIndex + 1;
M
Minghao Li 已提交
145 146 147
  return count > 0 ? count : 0;
}

148
SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
149
  SyncIndex          lastIndex;
M
Minghao Li 已提交
150 151 152
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
M
Minghao Li 已提交
153

154
  return lastVer;
M
Minghao Li 已提交
155 156
}

157
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
158 159 160 161
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
  return lastVer + 1;
M
Minghao Li 已提交
162 163
}

164 165 166 167 168 169 170
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  bool               b = walLogExist(pWal, index);
  return b;
}

171 172 173
// if success, return last term
// if not log, return 0
// if error, return SYNC_TERM_INVALID
174
SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
175 176 177 178 179 180 181
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  if (walIsEmpty(pWal)) {
    return 0;
  } else {
    SSyncRaftEntry* pLastEntry;
    int32_t         code = raftLogGetLastEntry(pLogStore, &pLastEntry);
182 183 184 185 186 187 188
    if (code == 0 && pLastEntry != NULL) {
      SyncTerm lastTerm = pLastEntry->term;
      taosMemoryFree(pLastEntry);
      return lastTerm;
    } else {
      return SYNC_TERM_INVALID;
    }
189 190
  }

191 192
  // can not be here!
  return SYNC_TERM_INVALID;
193 194
}

195
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync) {
M
Minghao Li 已提交
196 197 198 199
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

  SyncIndex    index = 0;
S
Shengliang Guan 已提交
200
  SWalSyncInfo syncMeta = {0};
M
Minghao Li 已提交
201 202 203
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
M
Minghao Li 已提交
204
  int64_t tsWriteBegin = taosGetTimestampNs();
B
Benguang Zhao 已提交
205
  index = walAppendLog(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
M
Minghao Li 已提交
206
  int64_t tsWriteEnd = taosGetTimestampNs();
207 208
  int64_t tsElapsed = tsWriteEnd - tsWriteBegin;

M
Minghao Li 已提交
209 210 211 212 213 214
  if (index < 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);

215 216
    sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
            pEntry->index, err, errStr, sysErr, sysErrStr);
M
Minghao Li 已提交
217 218
    return -1;
  }
B
Benguang Zhao 已提交
219

220
  ASSERT(pEntry->index == index);
M
Minghao Li 已提交
221

222
  walFsync(pWal, forceSync);
B
Benguang Zhao 已提交
223

224 225
  sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
          TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
M
Minghao Li 已提交
226 227 228
  return 0;
}

229 230 231
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
232
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
233 234
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
235
  int32_t            code = 0;
236 237 238

  *ppEntry = NULL;

239 240 241
  int64_t ts1 = taosGetTimestampNs();
  taosThreadMutexLock(&(pData->mutex));

L
Liu Jicong 已提交
242
  SWalReader* pWalHandle = pData->pWalHandle;
243
  if (pWalHandle == NULL) {
244
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
245
    sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId);
246
    taosThreadMutexUnlock(&(pData->mutex));
247 248 249
    return -1;
  }

M
Minghao Li 已提交
250
  int64_t ts2 = taosGetTimestampNs();
L
Liu Jicong 已提交
251
  code = walReadVer(pWalHandle, index);
252
  walReadReset(pWalHandle);
M
Minghao Li 已提交
253
  int64_t ts3 = taosGetTimestampNs();
254

255
  // code = walReadVerCached(pWalHandle, index);
256 257 258
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
259 260
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
261

S
Shengliang Guan 已提交
262
    if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
263 264
      sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index,
              err, errStr, sysErr, sysErrStr);
S
Shengliang Guan 已提交
265
    } else {
266 267
      sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index, err,
              errStr, sysErr, sysErrStr);
S
Shengliang Guan 已提交
268
    }
269

270 271 272 273 274
    /*
        int32_t saveErr = terrno;
        walCloseReadHandle(pWalHandle);
        terrno = saveErr;
    */
M
Minghao Li 已提交
275

276
    taosThreadMutexUnlock(&(pData->mutex));
277 278 279 280
    return code;
  }

  *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
281
  ASSERT(*ppEntry != NULL);
282 283 284 285 286 287
  (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
  (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
  (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
  (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
  (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
  (*ppEntry)->index = index;
288
  ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
289 290
  memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);

291 292 293 294 295
  /*
    int32_t saveErr = terrno;
    walCloseReadHandle(pWalHandle);
    terrno = saveErr;
  */
296

297
  taosThreadMutexUnlock(&(pData->mutex));
M
Minghao Li 已提交
298
  int64_t ts4 = taosGetTimestampNs();
M
Minghao Li 已提交
299 300 301 302 303 304 305 306 307 308 309

  int64_t tsElapsed = ts4 - ts1;
  int64_t tsElapsedLock = ts2 - ts1;
  int64_t tsElapsedRead = ts3 - ts2;
  int64_t tsElapsedBuild = ts4 - ts3;

  sNTrace(pData->pSyncNode,
          "read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64
          ", elapsed-build:%" PRId64,
          index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);

310 311
  return code;
}
M
Minghao Li 已提交
312

313
// truncate semantic
M
Minghao Li 已提交
314 315 316
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
317 318

  int32_t code = walRollback(pWal, fromIndex);
M
Minghao Li 已提交
319 320 321
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
322 323
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
324 325
    sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
           pData->pSyncNode->vgId, fromIndex, err, errStr, sysErr, sysErrStr);
M
Minghao Li 已提交
326
  }
M
Minghao Li 已提交
327 328

  // event log
S
Shengliang Guan 已提交
329
  sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex);
M
Minghao Li 已提交
330 331 332
  return code;
}

333 334 335
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
336 337 338
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
339
  ASSERT(ppLastEntry != NULL);
340 341 342 343 344 345 346

  *ppLastEntry = NULL;
  if (walIsEmpty(pWal)) {
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  } else {
    SyncIndex lastIndex = raftLogLastIndex(pLogStore);
347
    ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
348
    int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
349 350 351 352 353 354
    return code;
  }

  return -1;
}

M
Minghao Li 已提交
355
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
356 357
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
358 359 360 361 362 363 364 365 366 367 368

  // need not update
  SyncIndex snapshotVer = walGetSnapshotVer(pWal);
  SyncIndex walCommitVer = walGetCommittedVer(pWal);
  SyncIndex wallastVer = walGetLastVer(pWal);

  if (index < snapshotVer || index > wallastVer) {
    // ignore
    return 0;
  }

369 370 371 372
  int32_t code = walCommit(pWal, index);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
373 374
    int32_t     sysErr = errno;
    const char* sysErrStr = strerror(errno);
375 376
    sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
           pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr);
B
Benguang Zhao 已提交
377
    return -1;
378
  }
M
Minghao Li 已提交
379
  return 0;
M
Minghao Li 已提交
380
}
M
Minghao Li 已提交
381

M
Minghao Li 已提交
382
SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
383 384 385 386
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

M
Minghao Li 已提交
387 388 389 390
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walGetFirstVer(pWal);
391 392 393 394 395 396
}

SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walGetCommittedVer(pWal);
M
Minghao Li 已提交
397
}