diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e6a4dd1d493969a333005a64f515ba35dde34573..285e079b3ec90a066cd70fa3e7576dac3d5c8b8d 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -22,6 +22,7 @@ extern "C" { #include "cJSON.h" #include "tdef.h" +#include "tlrucache.h" #include "tmsgcb.h" extern bool gRaftDetailLog; @@ -153,7 +154,8 @@ typedef struct SSyncFSM { // abstract definition of log store in raft // SWal implements it typedef struct SSyncLogStore { - void* data; + SLRUCache* pCache; + void* data; // append one log entry int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 1e68fe346c71d65e0594643da5b94e2dd1ab204d..b604d25816e300560571458fde7153196e77eee5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -69,15 +69,26 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (agree) { // term - SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); - ASSERT(pEntry != NULL); - + SSyncRaftEntry* pEntry = NULL; + SLRUCache* pCache = pSyncNode->pLogStore->pCache; + LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index)); + if (h) { + pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); + } else { + pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); + ASSERT(pEntry != NULL); + } // cannot commit, even if quorum agree. need check term! if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { // update commit index newCommitIndex = index; - syncEntryDestory(pEntry); + if (h) { + taosLRUCacheRelease(pCache, h, false); + } else { + syncEntryDestory(pEntry); + } + break; } else { do { @@ -88,7 +99,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } while (0); } - syncEntryDestory(pEntry); + if (h) { + taosLRUCacheRelease(pCache, h, false); + } else { + syncEntryDestory(pEntry); + } } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6f29b54f806f1113ec69dface7bdcbb4b0c42afc..17157fbd2327ce126b4e63e509c7f0935539860e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2581,6 +2581,20 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { return ret; } +static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); } + +static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) { + int code = 0; + int entryLen = sizeof(*pEntry) + pEntry->dataLen; + LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen, + deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + return code; +} + static int32_t syncNodeAppendNoop(SSyncNode* ths) { int32_t ret = 0; @@ -2589,13 +2603,21 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); ASSERT(pEntry != NULL); + LRUHandle* h = NULL; + syncCacheEntry(ths->pLogStore, pEntry, &h); + if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); ASSERT(code == 0); syncNodeReplicate(ths, false); } - syncEntryDestory(pEntry); + if (h) { + taosLRUCacheRelease(ths->pLogStore->pCache, h, false); + } else { + syncEntryDestory(pEntry); + } + return ret; } @@ -2654,6 +2676,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); ASSERT(pEntry != NULL); + LRUHandle* h = NULL; + syncCacheEntry(ths->pLogStore, pEntry, &h); + if (ths->state == TAOS_SYNC_STATE_LEADER) { // append entry code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); @@ -2685,7 +2710,12 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI } } - syncEntryDestory(pEntry); + if (h) { + taosLRUCacheRelease(ths->pLogStore->pCache, h, false); + } else { + syncEntryDestory(pEntry); + } + return ret; } @@ -2973,9 +3003,15 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, for (SyncIndex i = beginIndex; i <= endIndex; ++i) { if (i != SYNC_INDEX_INVALID) { SSyncRaftEntry* pEntry; - code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); - ASSERT(code == 0); - ASSERT(pEntry != NULL); + SLRUCache* pCache = ths->pLogStore->pCache; + LRUHandle* h = taosLRUCacheLookup(pCache, &i, sizeof(i)); + if (h) { + pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); + } else { + code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); + ASSERT(code == 0); + ASSERT(pEntry != NULL); + } SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); @@ -3058,7 +3094,11 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, } rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); + if (h) { + taosLRUCacheRelease(pCache, h, false); + } else { + syncEntryDestory(pEntry); + } } } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 0649e064e45391cfe9082c24264a33b762d1a279..496c8419de7f56a96d544997f989ef6d16de4de3 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -53,6 +53,15 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); ASSERT(pLogStore != NULL); + pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5); + if (pLogStore->pCache == NULL) { + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; + taosMemoryFree(pLogStore); + return NULL; + } + + taosLRUCacheSetStrictCapacity(pLogStore->pCache, false); + pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); ASSERT(pLogStore->data != NULL); @@ -102,6 +111,10 @@ void logStoreDestory(SSyncLogStore* pLogStore) { taosThreadMutexDestroy(&(pData->mutex)); taosMemoryFree(pLogStore->data); + + taosLRUCacheEraseUnrefEntries(pLogStore->pCache); + taosLRUCacheCleanup(pLogStore->pCache); + taosMemoryFree(pLogStore); } } @@ -243,7 +256,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - int32_t code; + int32_t code = 0; *ppEntry = NULL; @@ -257,6 +270,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, taosThreadMutexLock(&(pData->mutex)); code = walReadVer(pWalHandle, index); + // code = walReadVerCached(pWalHandle, index); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err); @@ -412,6 +426,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ASSERT(pWalHandle != NULL); int32_t code = walReadVer(pWalHandle, index); + // int32_t code = walReadVerCached(pWalHandle, index); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err);