/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "syncRaftLog.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" #include "syncUtil.h" // log[m .. n] // public function static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex); static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index); static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore); static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry); SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* pLogStore = taosMemoryCalloc(1, sizeof(SSyncLogStore)); if (pLogStore == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } // pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5); pLogStore->pCache = taosLRUCacheInit(30 * 1024 * 1024, 1, .5); if (pLogStore->pCache == NULL) { taosMemoryFree(pLogStore); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pLogStore->cacheHit = 0; pLogStore->cacheMiss = 0; taosLRUCacheSetStrictCapacity(pLogStore->pCache, false); pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); ASSERT(pLogStore->data != NULL); SSyncLogStoreData* pData = pLogStore->data; pData->pSyncNode = pSyncNode; pData->pWal = pSyncNode->pWal; ASSERT(pData->pWal != NULL); taosThreadMutexInit(&(pData->mutex), NULL); pData->pWalHandle = walOpenReader(pData->pWal, NULL); ASSERT(pData->pWalHandle != NULL); pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex; pLogStore->syncLogCommitIndex = raftlogCommitIndex; pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot; pLogStore->syncLogBeginIndex = raftLogBeginIndex; pLogStore->syncLogEndIndex = raftLogEndIndex; pLogStore->syncLogIsEmpty = raftLogIsEmpty; pLogStore->syncLogEntryCount = raftLogEntryCount; pLogStore->syncLogLastIndex = raftLogLastIndex; pLogStore->syncLogLastTerm = raftLogLastTerm; pLogStore->syncLogAppendEntry = raftLogAppendEntry; pLogStore->syncLogGetEntry = raftLogGetEntry; pLogStore->syncLogTruncate = raftLogTruncate; pLogStore->syncLogWriteIndex = raftLogWriteIndex; pLogStore->syncLogExist = raftLogExist; return pLogStore; } void logStoreDestory(SSyncLogStore* pLogStore) { if (pLogStore != NULL) { SSyncLogStoreData* pData = pLogStore->data; taosThreadMutexLock(&(pData->mutex)); if (pData->pWalHandle != NULL) { walCloseReader(pData->pWalHandle); pData->pWalHandle = NULL; } taosThreadMutexUnlock(&(pData->mutex)); taosThreadMutexDestroy(&(pData->mutex)); taosMemoryFree(pLogStore->data); taosLRUCacheEraseUnrefEntries(pLogStore->pCache); taosLRUCacheCleanup(pLogStore->pCache); taosMemoryFree(pLogStore); } } // log[m .. n] static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) { ASSERT(snapshotIndex >= 0); SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; int32_t code = walRestoreFromSnapshot(pWal, snapshotIndex); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); sNError(pData->pSyncNode, "wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex, err, errStr, sysErr, sysErrStr); return -1; } return 0; } SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex firstVer = walGetFirstVer(pWal); return firstVer; } SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); } bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; return walIsEmpty(pWal); } int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) { SyncIndex beginIndex = raftLogBeginIndex(pLogStore); SyncIndex endIndex = raftLogEndIndex(pLogStore); int32_t count = endIndex - beginIndex + 1; return count > 0 ? count : 0; } SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) { SyncIndex lastIndex; SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex lastVer = walGetLastVer(pWal); return lastVer; } SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex lastVer = walGetLastVer(pWal); return lastVer + 1; } static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; bool b = walLogExist(pWal, index); return b; } // if success, return last term // if not log, return 0 // if error, return SYNC_TERM_INVALID SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; if (walIsEmpty(pWal)) { return 0; } else { SSyncRaftEntry* pLastEntry; int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry); if (code == 0 && pLastEntry != NULL) { SyncTerm lastTerm = pLastEntry->term; taosMemoryFree(pLastEntry); return lastTerm; } else { return SYNC_TERM_INVALID; } } // can not be here! return SYNC_TERM_INVALID; } static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); } static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex index = 0; SWalSyncInfo syncMeta = {0}; syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; int64_t tsWriteBegin = taosGetTimestampNs(); index = walAppendLog(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); int64_t tsWriteEnd = taosGetTimestampNs(); int64_t tsElapsed = tsWriteEnd - tsWriteBegin; if (index < 0) { int32_t err = terrno; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", pEntry->index, err, errStr, sysErr, sysErrStr); return -1; } ASSERT(pEntry->index == index); bool forceSync = raftLogForceSync(pEntry); walFsync(pWal, forceSync); sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); return 0; } // entry found, return 0 // entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST // other error, return -1 int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; int32_t code = 0; *ppEntry = NULL; int64_t ts1 = taosGetTimestampNs(); taosThreadMutexLock(&(pData->mutex)); SWalReader* pWalHandle = pData->pWalHandle; if (pWalHandle == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId); taosThreadMutexUnlock(&(pData->mutex)); return -1; } int64_t ts2 = taosGetTimestampNs(); code = walReadVer(pWalHandle, index); int64_t ts3 = taosGetTimestampNs(); // code = walReadVerCached(pWalHandle, index); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index, err, errStr, sysErr, sysErrStr); } else { sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index, err, errStr, sysErr, sysErrStr); } /* int32_t saveErr = terrno; walCloseReadHandle(pWalHandle); terrno = saveErr; */ taosThreadMutexUnlock(&(pData->mutex)); return code; } *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); ASSERT(*ppEntry != NULL); (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST; (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType; (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum; (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term; (*ppEntry)->index = index; ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen); memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); /* int32_t saveErr = terrno; walCloseReadHandle(pWalHandle); terrno = saveErr; */ taosThreadMutexUnlock(&(pData->mutex)); int64_t ts4 = taosGetTimestampNs(); int64_t tsElapsed = ts4 - ts1; int64_t tsElapsedLock = ts2 - ts1; int64_t tsElapsedRead = ts3 - ts2; int64_t tsElapsedBuild = ts4 - ts3; sNTrace(pData->pSyncNode, "read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64 ", elapsed-build:%" PRId64, index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild); return code; } // truncate semantic static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; int32_t code = walRollback(pWal, fromIndex); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, fromIndex, err, errStr, sysErr, sysErrStr); } // event log sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex); return code; } // entry found, return 0 // entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST // other error, return -1 static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; ASSERT(ppLastEntry != NULL); *ppLastEntry = NULL; if (walIsEmpty(pWal)) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } else { SyncIndex lastIndex = raftLogLastIndex(pLogStore); ASSERT(lastIndex >= SYNC_INDEX_BEGIN); int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); return code; } return -1; } int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; // need not update SyncIndex snapshotVer = walGetSnapshotVer(pWal); SyncIndex walCommitVer = walGetCommittedVer(pWal); SyncIndex wallastVer = walGetLastVer(pWal); if (index < snapshotVer || index > wallastVer) { // ignore return 0; } int32_t code = walCommit(pWal, index); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr); return -1; } return 0; } SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; return pData->pSyncNode->commitIndex; } SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; return walGetFirstVer(pWal); } SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; return walGetCommittedVer(pWal); }