提交 c5aa7e07 编写于 作者: M Minghao Li

refactor(sync): optimize, make LRU Cache hit more

上级 d31d05fe
......@@ -225,6 +225,7 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
......@@ -193,12 +193,33 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(pAppendEntry != NULL);
SyncIndex appendIndex = pMsg->prevLogIndex + 1;
LRUHandle* hLocal = NULL;
LRUHandle* hAppend = NULL;
int32_t code = 0;
SSyncRaftEntry* pLocalEntry = NULL;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
SLRUCache* pCache = ths->pLogStore->pCache;
hLocal = taosLRUCacheLookup(pCache, &appendIndex, sizeof(appendIndex));
if (hLocal) {
pLocalEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, hLocal);
code = 0;
sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", appendIndex, pLocalEntry->bytes, pLocalEntry);
} else {
sNTrace(ths, "miss cache index:%" PRId64, appendIndex);
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
if (code == 0) {
// get local entry success
if (pLocalEntry->term == pAppendEntry->term) {
// do nothing
sNTrace(ths, "log match, do nothing, index:%" PRId64, appendIndex);
} else {
// truncate
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
......@@ -207,8 +228,18 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
snprintf(logBuf, sizeof(logBuf), "ignore, truncate error, append-index:%" PRId64, appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
goto _IGNORE;
......@@ -219,10 +250,22 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
goto _IGNORE;
syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend);
} else {
......@@ -248,20 +291,42 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
goto _IGNORE;
syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend);
} else {
// error
// get local entry success
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%" PRId64 " err:%d", appendIndex,
syncLogRecvAppendEntries(ths, pMsg, logBuf);
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
goto _IGNORE;
......@@ -269,8 +334,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// update match index
pReply->matchIndex = pAppendEntry->index;
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
} else {
// no append entries, do nothing
......@@ -116,7 +116,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", index, pEntry->bytes, pEntry);
} else {
sNTrace(pSyncNode, "miss cache index:%" PRId64, index);
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
sNError(pSyncNode, "advance commit index error, read wal index:%" PRId64, index);
......@@ -383,19 +383,37 @@ bool syncIsReadyForRead(int64_t rid) {
} else {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
int32_t code = 0;
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
} else {
sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
ready = true;
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
if (!ready) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
......@@ -1763,8 +1781,22 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
SyncTerm preTerm = 0;
SyncIndex preIndex = index - 1;
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
int32_t code = 0;
if (h) {
pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
} else {
sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
SSnapshot snapshot = {.data = NULL,
.lastApplyIndex = SYNC_INDEX_INVALID,
......@@ -1774,7 +1806,13 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
if (code == 0) {
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
return preTerm;
} else {
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
......@@ -1820,9 +1858,6 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
} else {
sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64,
pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser);
......@@ -1856,16 +1891,6 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
#if 0
// reset timer ms
if (syncIsInit() && pNode->electBaseLine > 0) {
pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
} else {
sError("sync env is stop, syncNodeEqElectTimer");
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
......@@ -1979,7 +2004,10 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
SSyncLogStoreData* pData = pLogStore->data;
sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
int32_t code = 0;
int32_t entryLen = sizeof(*pEntry) + pEntry->dataLen;
LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
......@@ -2000,7 +2028,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
ASSERT(pEntry != NULL);
LRUHandle* h = NULL;
syncCacheEntry(ths->pLogStore, pEntry, &h);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
......@@ -2008,6 +2035,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
sError("append noop error");
return -1;
syncCacheEntry(ths->pLogStore, pEntry, &h);
if (h) {
......@@ -2143,7 +2172,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
LRUHandle* h = NULL;
syncCacheEntry(ths->pLogStore, pEntry, &h);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
// append entry
......@@ -2183,6 +2211,8 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
syncCacheEntry(ths->pLogStore, pEntry, &h);
// if mulit replica, start replicate right now
if (ths->replicaNum > 1) {
......@@ -2349,7 +2379,12 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
LRUHandle* h = taosLRUCacheLookup(pCache, &i, sizeof(i));
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);
} else {
sNTrace(ths, "miss cache index:%" PRId64, i);
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
// ASSERT(code == 0);
// ASSERT(pEntry != NULL);
......@@ -92,6 +92,8 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
void syncEntryDestory(SSyncRaftEntry* pEntry) {
if (pEntry != NULL) {
sTrace("free entry: %p", pEntry);
......@@ -37,7 +37,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
return NULL;
pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
// pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
pLogStore->pCache = taosLRUCacheInit(100 * 1024 * 1024, 1, .5);
if (pLogStore->pCache == NULL) {
......@@ -321,6 +322,17 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
return 0;
// delete from cache
for (SyncIndex index = fromIndex; index <= wallastVer; ++index) {
SLRUCache* pCache = pData->pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
if (h) {
sNTrace(pData->pSyncNode, "cache delete index:%" PRId64, index);
taosLRUCacheRelease(pData->pSyncNode->pLogStore->pCache, h, true);
int32_t code = walRollback(pWal, fromIndex);
if (code != 0) {
int32_t err = terrno;
......@@ -73,7 +73,20 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &nextIndex, sizeof(nextIndex));
int32_t code = 0;
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", nextIndex, pEntry->bytes, pEntry);
} else {
sNTrace(pSyncNode, "miss cache index:%" PRId64, nextIndex);
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
if (code == 0) {
ASSERT(pEntry != NULL);
......@@ -99,7 +112,11 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
// prepare msg
......@@ -212,7 +212,11 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
char cfgStr[1024];
if (pNode->pRaftCfg != NULL) {
syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));
} else {
char peerStr[1024] = "{";
syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
......@@ -230,6 +234,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
// restore error code
terrno = errCode;
if (pNode != NULL && pNode->pRaftCfg != NULL) {
taosPrintLog(flags, level, dflag,
"vgId:%d, sync %s "
......@@ -241,6 +246,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
......@@ -364,9 +370,9 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64
", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
