diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e81f63d871e053032aa289076e08a1a1d860f3f6..0e0042d526d4ccbe716508c8f75ad6841d878a5d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -225,6 +225,7 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); +int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4551702b34b1d75c3a7029a277badd6dbbc2081d..df2ccee5461a19db57c51cdc0b367a0d72a8c2a8 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -192,13 +192,34 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SSyncRaftEntry* pAppendEntry = syncEntryBuildFromAppendEntries(pMsg); ASSERT(pAppendEntry != NULL); - SyncIndex appendIndex = pMsg->prevLogIndex + 1; + SyncIndex appendIndex = pMsg->prevLogIndex + 1; + + LRUHandle* hLocal = NULL; + LRUHandle* hAppend = NULL; + + int32_t code = 0; SSyncRaftEntry* pLocalEntry = NULL; - int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry); + SLRUCache* pCache = ths->pLogStore->pCache; + hLocal = taosLRUCacheLookup(pCache, &appendIndex, sizeof(appendIndex)); + if (hLocal) { + pLocalEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, hLocal); + code = 0; + + sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", appendIndex, pLocalEntry->bytes, pLocalEntry); + + } else { + sNTrace(ths, "miss cache index:%" PRId64, appendIndex); + + code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry); + } + if (code == 0) { + // get local entry success + if (pLocalEntry->term == pAppendEntry->term) { // do nothing sNTrace(ths, "log match, do nothing, index:%" PRId64, appendIndex); + } else { // truncate code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); @@ -207,8 +228,18 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { snprintf(logBuf, sizeof(logBuf), "ignore, truncate error, append-index:%" PRId64, appendIndex); syncLogRecvAppendEntries(ths, pMsg, logBuf); - syncEntryDestory(pLocalEntry); - syncEntryDestory(pAppendEntry); + if (hLocal) { + taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false); + } else { + syncEntryDestory(pLocalEntry); + } + + if (hAppend) { + taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false); + } else { + syncEntryDestory(pAppendEntry); + } + goto _IGNORE; } @@ -219,10 +250,22 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex); syncLogRecvAppendEntries(ths, pMsg, logBuf); - syncEntryDestory(pLocalEntry); - syncEntryDestory(pAppendEntry); + if (hLocal) { + taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false); + } else { + syncEntryDestory(pLocalEntry); + } + + if (hAppend) { + taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false); + } else { + syncEntryDestory(pAppendEntry); + } + goto _IGNORE; } + + syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend); } } else { @@ -248,20 +291,42 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex); syncLogRecvAppendEntries(ths, pMsg, logBuf); - syncEntryDestory(pLocalEntry); - syncEntryDestory(pAppendEntry); + if (hLocal) { + taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false); + } else { + syncEntryDestory(pLocalEntry); + } + + if (hAppend) { + taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false); + } else { + syncEntryDestory(pAppendEntry); + } + goto _IGNORE; } + syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend); + } else { - // error + // get local entry success char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%" PRId64 " err:%d", appendIndex, terrno); syncLogRecvAppendEntries(ths, pMsg, logBuf); - syncEntryDestory(pLocalEntry); - syncEntryDestory(pAppendEntry); + if (hLocal) { + taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false); + } else { + syncEntryDestory(pLocalEntry); + } + + if (hAppend) { + taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false); + } else { + syncEntryDestory(pAppendEntry); + } + goto _IGNORE; } } @@ -269,8 +334,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // update match index pReply->matchIndex = pAppendEntry->index; - syncEntryDestory(pLocalEntry); - syncEntryDestory(pAppendEntry); + if (hLocal) { + taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false); + } else { + syncEntryDestory(pLocalEntry); + } + + if (hAppend) { + taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false); + } else { + syncEntryDestory(pAppendEntry); + } } else { // no append entries, do nothing diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index d2320fc6beec2a4fba966f06a0471e168f4cdf49..60bec6ab659cbb92c275465c803a44f26581f484 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -116,7 +116,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index)); if (h) { pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); + + sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", index, pEntry->bytes, pEntry); + } else { + sNTrace(pSyncNode, "miss cache index:%" PRId64, index); + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry); if (code != 0) { sNError(pSyncNode, "advance commit index error, read wal index:%" PRId64, index); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e802f60f30d978ee6ef9cd95cb542d7e7fc10ec5..a59078fff6a95c71ceaadbc1e8d2d8eb3d888a5e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -383,15 +383,33 @@ bool syncIsReadyForRead(int64_t rid) { } else { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { + SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SSyncRaftEntry* pEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + SLRUCache* pCache = pSyncNode->pLogStore->pCache; + LRUHandle* h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex)); + int32_t code = 0; + if (h) { + pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); + code = 0; + + sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry); + + } else { + sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex); + + code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry); + } + if (code == 0 && pEntry != NULL) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { ready = true; } - syncEntryDestory(pEntry); + if (h) { + taosLRUCacheRelease(pCache, h, false); + } else { + syncEntryDestory(pEntry); + } } } } @@ -1761,10 +1779,24 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { return 0; } - SyncTerm preTerm = 0; - SyncIndex preIndex = index - 1; + SyncTerm preTerm = 0; + SyncIndex preIndex = index - 1; + SSyncRaftEntry* pPreEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry); + SLRUCache* pCache = pSyncNode->pLogStore->pCache; + LRUHandle* h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex)); + int32_t code = 0; + if (h) { + pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); + code = 0; + + sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry); + + } else { + sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex); + + code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry); + } SSnapshot snapshot = {.data = NULL, .lastApplyIndex = SYNC_INDEX_INVALID, @@ -1774,7 +1806,13 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { if (code == 0) { ASSERT(pPreEntry != NULL); preTerm = pPreEntry->term; - taosMemoryFree(pPreEntry); + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } else { + syncEntryDestory(pPreEntry); + } + return preTerm; } else { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { @@ -1820,9 +1858,6 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer); - } else { - sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64, - pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser); } } @@ -1856,16 +1891,6 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } taosMemoryFree(pElectTimer); - -#if 0 - // reset timer ms - if (syncIsInit() && pNode->electBaseLine > 0) { - pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer); - } else { - sError("sync env is stop, syncNodeEqElectTimer"); - } -#endif } static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { @@ -1979,7 +2004,10 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); } -static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) { +int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) { + SSyncLogStoreData* pData = pLogStore->data; + sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry); + int32_t code = 0; int32_t entryLen = sizeof(*pEntry) + pEntry->dataLen; LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen, @@ -2000,7 +2028,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { ASSERT(pEntry != NULL); LRUHandle* h = NULL; - syncCacheEntry(ths->pLogStore, pEntry, &h); if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); @@ -2008,6 +2035,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { sError("append noop error"); return -1; } + + syncCacheEntry(ths->pLogStore, pEntry, &h); } if (h) { @@ -2143,7 +2172,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn } LRUHandle* h = NULL; - syncCacheEntry(ths->pLogStore, pEntry, &h); if (ths->state == TAOS_SYNC_STATE_LEADER) { // append entry @@ -2183,6 +2211,8 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn } } + syncCacheEntry(ths->pLogStore, pEntry, &h); + // if mulit replica, start replicate right now if (ths->replicaNum > 1) { syncNodeReplicate(ths); @@ -2349,7 +2379,12 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde LRUHandle* h = taosLRUCacheLookup(pCache, &i, sizeof(i)); if (h) { pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); + + sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry); + } else { + sNTrace(ths, "miss cache index:%" PRId64, i); + code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); // ASSERT(code == 0); // ASSERT(pEntry != NULL); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 540054e4731f5990565b3d6424a89b6d05da8a44..a7594091639c3ce69c25abfe993b908e43084c28 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -92,6 +92,8 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) void syncEntryDestory(SSyncRaftEntry* pEntry) { if (pEntry != NULL) { taosMemoryFree(pEntry); + + sTrace("free entry: %p", pEntry); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index b00ba3918ce612125c968b5c87be62cdf64ec742..2b1cee51d70840466ae60779f407b5aaf53df07c 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -37,7 +37,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { return NULL; } - pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5); + // pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5); + pLogStore->pCache = taosLRUCacheInit(100 * 1024 * 1024, 1, .5); if (pLogStore->pCache == NULL) { taosMemoryFree(pLogStore); terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; @@ -321,6 +322,17 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn return 0; } + // delete from cache + for (SyncIndex index = fromIndex; index <= wallastVer; ++index) { + SLRUCache* pCache = pData->pSyncNode->pLogStore->pCache; + LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index)); + if (h) { + sNTrace(pData->pSyncNode, "cache delete index:%" PRId64, index); + + taosLRUCacheRelease(pData->pSyncNode->pLogStore->pCache, h, true); + } + } + int32_t code = walRollback(pWal, fromIndex); if (code != 0) { int32_t err = terrno; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 6a7a2c18c14e94a210867a13365a26e9e23c8aca..802595c55af72956434866ef9686b6f3c84d2a12 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -73,7 +73,20 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh SyncAppendEntries* pMsg = NULL; SSyncRaftEntry* pEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); + SLRUCache* pCache = pSyncNode->pLogStore->pCache; + LRUHandle* h = taosLRUCacheLookup(pCache, &nextIndex, sizeof(nextIndex)); + int32_t code = 0; + if (h) { + pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); + code = 0; + + sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", nextIndex, pEntry->bytes, pEntry); + + } else { + sNTrace(pSyncNode, "miss cache index:%" PRId64, nextIndex); + + code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); + } if (code == 0) { ASSERT(pEntry != NULL); @@ -99,7 +112,11 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh } } - syncEntryDestory(pEntry); + if (h) { + taosLRUCacheRelease(pCache, h, false); + } else { + syncEntryDestory(pEntry); + } // prepare msg ASSERT(pMsg != NULL); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index b9a271ab9d135421fea3d87b8d6afcf2ce79e55e..fb1b07b0b62cab4ef72221827d4507a91a569eee 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -212,7 +212,11 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo } char cfgStr[1024]; - syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr)); + if (pNode->pRaftCfg != NULL) { + syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr)); + } else { + return; + } char peerStr[1024] = "{"; syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); @@ -230,17 +234,19 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo // restore error code terrno = errCode; - taosPrintLog(flags, level, dflag, - "vgId:%d, sync %s " - "%s" - ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 - ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, - logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, - pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + if (pNode != NULL && pNode->pRaftCfg != NULL) { + taosPrintLog(flags, level, dflag, + "vgId:%d, sync %s " + "%s" + ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 + ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 + ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", + pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, + logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, + pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, + pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + } } void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, @@ -364,9 +370,9 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); + "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 + ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {