提交 8bc6a629 编写于 作者: M Minglei Jin

enh(sync): log entry cache for sync

上级 b08520d9
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
#include "cJSON.h" #include "cJSON.h"
#include "tdef.h" #include "tdef.h"
#include "tlrucache.h"
#include "tmsgcb.h" #include "tmsgcb.h"
extern bool gRaftDetailLog; extern bool gRaftDetailLog;
...@@ -153,7 +154,8 @@ typedef struct SSyncFSM { ...@@ -153,7 +154,8 @@ typedef struct SSyncFSM {
// abstract definition of log store in raft // abstract definition of log store in raft
// SWal implements it // SWal implements it
typedef struct SSyncLogStore { typedef struct SSyncLogStore {
void* data; SLRUCache* pCache;
void* data;
// append one log entry // append one log entry
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
......
...@@ -69,15 +69,26 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -69,15 +69,26 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (agree) { if (agree) {
// term // term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); SSyncRaftEntry* pEntry = NULL;
ASSERT(pEntry != NULL); SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
} else {
pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
ASSERT(pEntry != NULL);
}
// cannot commit, even if quorum agree. need check term! // cannot commit, even if quorum agree. need check term!
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
// update commit index // update commit index
newCommitIndex = index; newCommitIndex = index;
syncEntryDestory(pEntry); if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestory(pEntry);
}
break; break;
} else { } else {
do { do {
...@@ -88,7 +99,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -88,7 +99,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
} while (0); } while (0);
} }
syncEntryDestory(pEntry); if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestory(pEntry);
}
} }
} }
......
...@@ -2581,6 +2581,20 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { ...@@ -2581,6 +2581,20 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
return ret; return ret;
} }
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
int code = 0;
int entryLen = sizeof(*pEntry) + pEntry->dataLen;
LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
return code;
}
static int32_t syncNodeAppendNoop(SSyncNode* ths) { static int32_t syncNodeAppendNoop(SSyncNode* ths) {
int32_t ret = 0; int32_t ret = 0;
...@@ -2589,13 +2603,21 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { ...@@ -2589,13 +2603,21 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
LRUHandle* h = NULL;
syncCacheEntry(ths->pLogStore, pEntry, &h);
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
ASSERT(code == 0); ASSERT(code == 0);
syncNodeReplicate(ths, false); syncNodeReplicate(ths, false);
} }
syncEntryDestory(pEntry); if (h) {
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
} else {
syncEntryDestory(pEntry);
}
return ret; return ret;
} }
...@@ -2654,6 +2676,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI ...@@ -2654,6 +2676,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
LRUHandle* h = NULL;
syncCacheEntry(ths->pLogStore, pEntry, &h);
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
// append entry // append entry
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
...@@ -2685,7 +2710,12 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI ...@@ -2685,7 +2710,12 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
} }
} }
syncEntryDestory(pEntry); if (h) {
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
} else {
syncEntryDestory(pEntry);
}
return ret; return ret;
} }
...@@ -2973,9 +3003,15 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -2973,9 +3003,15 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
for (SyncIndex i = beginIndex; i <= endIndex; ++i) { for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) { if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry; SSyncRaftEntry* pEntry;
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); SLRUCache* pCache = ths->pLogStore->pCache;
ASSERT(code == 0); LRUHandle* h = taosLRUCacheLookup(pCache, &i, sizeof(i));
ASSERT(pEntry != NULL); if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
} else {
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
ASSERT(code == 0);
ASSERT(pEntry != NULL);
}
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
...@@ -3058,7 +3094,11 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -3058,7 +3094,11 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry); if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestory(pEntry);
}
} }
} }
} }
......
...@@ -53,6 +53,15 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { ...@@ -53,6 +53,15 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
ASSERT(pLogStore != NULL); ASSERT(pLogStore != NULL);
pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
if (pLogStore->pCache == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosMemoryFree(pLogStore);
return NULL;
}
taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
ASSERT(pLogStore->data != NULL); ASSERT(pLogStore->data != NULL);
...@@ -102,6 +111,10 @@ void logStoreDestory(SSyncLogStore* pLogStore) { ...@@ -102,6 +111,10 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
taosThreadMutexDestroy(&(pData->mutex)); taosThreadMutexDestroy(&(pData->mutex));
taosMemoryFree(pLogStore->data); taosMemoryFree(pLogStore->data);
taosLRUCacheEraseUnrefEntries(pLogStore->pCache);
taosLRUCacheCleanup(pLogStore->pCache);
taosMemoryFree(pLogStore); taosMemoryFree(pLogStore);
} }
} }
...@@ -243,7 +256,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -243,7 +256,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) { static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
int32_t code; int32_t code = 0;
*ppEntry = NULL; *ppEntry = NULL;
...@@ -257,6 +270,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, ...@@ -257,6 +270,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
code = walReadVer(pWalHandle, index); code = walReadVer(pWalHandle, index);
// code = walReadVerCached(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
...@@ -412,6 +426,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -412,6 +426,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
ASSERT(pWalHandle != NULL); ASSERT(pWalHandle != NULL);
int32_t code = walReadVer(pWalHandle, index); int32_t code = walReadVer(pWalHandle, index);
// int32_t code = walReadVerCached(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册