未验证 提交 d5f37359 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #21722 from taosdata/fix/TD-24761

fix(tsdb/cache): rewrite cache update to fix cpu usage
......@@ -24,7 +24,8 @@ extern "C" {
typedef struct SLRUCache SLRUCache;
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value);
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value, void *ud);
typedef int (*_taos_lru_functor_t)(const void *key, size_t keyLen, void *value, void *ud);
typedef struct LRUHandle LRUHandle;
......@@ -41,10 +42,11 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
void taosLRUCacheCleanup(SLRUCache *cache);
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority);
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud);
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen);
void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen);
void taosLRUCacheApply(SLRUCache *cache, _taos_lru_functor_t functor, void *ud);
void taosLRUCacheEraseUnrefEntries(SLRUCache *cache);
bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle);
......
......@@ -357,19 +357,25 @@ typedef struct {
STSchema *pTSchema;
} SRocksCache;
typedef struct {
STsdb *pTsdb;
int flush_count;
} SCacheFlushState;
struct STsdb {
char *path;
SVnode *pVnode;
STsdbKeepCfg keepCfg;
TdThreadRwlock rwLock;
SMemTable *mem;
SMemTable *imem;
STsdbFS fs;
SLRUCache *lruCache;
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
SRocksCache rCache;
char *path;
SVnode *pVnode;
STsdbKeepCfg keepCfg;
TdThreadRwlock rwLock;
SMemTable *mem;
SMemTable *imem;
STsdbFS fs;
SLRUCache *lruCache;
SCacheFlushState flushState;
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
SRocksCache rCache;
};
struct TSDBKEY {
......
......@@ -151,7 +151,6 @@ int32_t metaCacheOpen(SMeta* pMeta) {
taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);
pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
if (pCache->STbGroupResCache.pResCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -169,7 +168,6 @@ int32_t metaCacheOpen(SMeta* pMeta) {
taosHashSetFreeFp(pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
taosThreadMutexInit(&pCache->STbGroupResCache.lock, NULL);
pMeta->pCache = pCache;
return code;
......@@ -486,14 +484,14 @@ static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInv
}
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
// ASSERT(keyLen == sizeof(int64_t) * 2);
// ASSERT(keyLen == sizeof(int64_t) * 2);
memcpy(&pBuf[2], key, keyLen);
}
// the format of key:
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
buf[0] = (uint64_t) pHashMap;
buf[0] = (uint64_t)pHashMap;
buf[1] = suid;
setMD5DigestInKey(buf, key, keyLen);
ASSERT(keyLen == sizeof(uint64_t) * 2);
......@@ -501,7 +499,7 @@ static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid,
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
// generate the composed key for LRU cache
......@@ -541,7 +539,8 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
((double)(*pEntry)->hitTimes) / acc);
}
taosLRUCacheRelease(pCache, pHandle, false);
......@@ -551,7 +550,8 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
return TSDB_CODE_SUCCESS;
}
static void freeUidCachePayload(const void* key, size_t keyLen, void* value) {
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
if (value == NULL) {
return;
}
......@@ -607,7 +607,7 @@ static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyL
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen, double selectivityRatio) {
int32_t code = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
if (selectivityRatio > tsSelectivityRatio) {
......@@ -640,7 +640,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
} else { // check if it exists or not
} else { // check if it exists or not
size_t size = listNEles(&(*pEntry)->list);
if (size == 0) {
tdListAppend(&(*pEntry)->list, pKey);
......@@ -659,7 +659,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
// add to cache.
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
......@@ -675,7 +675,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
uint64_t dummy[2] = {0};
initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16);
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
taosThreadMutexLock(pLock);
......@@ -700,12 +700,12 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%"PRId64" cached related tag filter uid list cleared", vgId, suid);
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
return TSDB_CODE_SUCCESS;
}
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
// generate the composed key for LRU cache
......@@ -738,7 +738,8 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
((double)(*pEntry)->hitTimes) / acc);
}
taosLRUCacheRelease(pCache, pHandle, false);
......@@ -748,8 +749,8 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
return TSDB_CODE_SUCCESS;
}
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value) {
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
if (value == NULL) {
return;
}
......@@ -778,8 +779,8 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value)
taosMemoryFree(tmp);
double el = (taosGetTimestampUs() - st) / 1000.0;
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
el);
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
listNEles(&((*pEntry)->list)), el);
break;
}
}
......@@ -788,11 +789,10 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value)
taosArrayDestroy((SArray*)value);
}
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen) {
int32_t code = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
if (payloadLen > tsTagFilterResCacheSize) {
......@@ -817,7 +817,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
} else { // check if it exists or not
} else { // check if it exists or not
size_t size = listNEles(&(*pEntry)->list);
if (size == 0) {
tdListAppend(&(*pEntry)->list, pKey);
......@@ -836,7 +836,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
// add to cache.
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
......@@ -852,7 +852,7 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
uint64_t dummy[2] = {0};
initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16);
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
taosThreadMutexLock(pLock);
......@@ -877,8 +877,6 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%"PRId64" cached related tb group cleared", vgId, suid);
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
return TSDB_CODE_SUCCESS;
}
......@@ -466,7 +466,12 @@ static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
}
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value, void* ud) {
(void)key;
(void)keyLen;
(void)ud;
freeTableCachedVal(value);
}
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
for (int32_t j = 0; j < numOfExpr; ++j) {
......@@ -554,7 +559,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
freeReader = true;
int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
if (ret != TAOS_LRU_STATUS_OK) {
qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
freeTableCachedVal(pVal);
......
......@@ -29,7 +29,10 @@ typedef struct {
char buf[0];
} SDataBlock;
static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
taosMemoryFree(value);
}
static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
char* p = buf;
......@@ -136,7 +139,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
memcpy(buf + total, blk->buf + blkOffset, nread);
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
TAOS_LRU_PRIORITY_LOW);
TAOS_LRU_PRIORITY_LOW, NULL);
if (s != TAOS_LRU_STATUS_OK) {
return -1;
}
......
......@@ -37,7 +37,6 @@
#include "syncVoteMgr.h"
#include "tglobal.h"
#include "tref.h"
#include "syncUtil.h"
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
......@@ -141,10 +140,10 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return -1;
if(pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex){
if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
syncNodeRelease(pSyncNode);
sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
return 0;
}
......@@ -323,8 +322,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
}
if (pSyncNode->totalReplicaNum > 1) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER
&& pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
lastApplyIndex);
syncNodeRelease(pSyncNode);
......@@ -544,7 +543,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
if (pSyncNode == NULL) return;
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
if(pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
SEp* pEp = &pEpSet->eps[i];
tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
......@@ -579,21 +578,19 @@ int32_t syncIsCatchUp(int64_t rid) {
}
int32_t isCatchUp = 0;
if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){
sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
isCatchUp = 0;
}
else{
sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
} else {
sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
isCatchUp = 1;
}
syncNodeRelease(pSyncNode);
return isCatchUp;
}
......@@ -606,7 +603,7 @@ ESyncRole syncGetRole(int64_t rid) {
}
ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
syncNodeRelease(pSyncNode);
return role;
}
......@@ -802,8 +799,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->vgId = pSyncInfo->vgId;
SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
bool updated = false;
sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d",
pSyncNode->vgId, pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
SNodeInfo* pNode = &pCfg->nodeInfo[i];
if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
......@@ -1110,10 +1107,9 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
int32_t syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if(pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER){
if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
syncNodeBecomeLearner(pSyncNode, "first start");
}
else{
} else {
if (pSyncNode->replicaNum == 1) {
raftStoreNextTerm(pSyncNode);
syncNodeBecomeLeader(pSyncNode, "one replica start");
......@@ -1122,7 +1118,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
syncNodeAppendNoop(pSyncNode);
} else {
syncNodeBecomeFollower(pSyncNode, "first start");
}
}
}
int32_t ret = 0;
......@@ -1438,7 +1434,7 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg
const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
if(pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
}
return false;
......@@ -1475,10 +1471,9 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
// log begin config change
sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
pSyncNode->vgId,
oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum,
oldConfig.lastIndex, pNewConfig->lastIndex);
sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
pNewConfig->lastIndex);
if (IamInNew) {
pSyncNode->raftCfg.isStandBy = 0; // change isStandBy to normal
......@@ -2234,7 +2229,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
syncNodeRelease(pSyncNode);
}
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
taosMemoryFree(value);
}
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
SSyncLogStoreData* pData = pLogStore->data;
......@@ -2243,7 +2241,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
int32_t code = 0;
int32_t entryLen = sizeof(*pEntry) + pEntry->dataLen;
LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -2409,11 +2407,10 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs;
sTrace(
"vgId:%d, heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
sTrace("vgId:%d, heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId,
DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
if(pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER){
if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
raftStoreSetTerm(ths, pMsg->term);
currentTerm = pMsg->term;
}
......
......@@ -39,6 +39,7 @@ enum {
struct SLRUEntry {
void *value;
_taos_lru_deleter_t deleter;
void *ud;
SLRUEntry *nextHash;
SLRUEntry *next;
SLRUEntry *prev;
......@@ -94,7 +95,7 @@ static void taosLRUEntryFree(SLRUEntry *entry) {
ASSERT(entry->refs == 0);
if (entry->deleter) {
(*entry->deleter)(entry->keyData, entry->keyLength, entry->value);
(*entry->deleter)(entry->keyData, entry->keyLength, entry->value, entry->ud);
}
taosMemoryFree(entry);
......@@ -146,6 +147,25 @@ static void taosLRUEntryTableCleanup(SLRUEntryTable *table) {
taosMemoryFree(table->list);
}
static int taosLRUEntryTableApplyF(SLRUEntryTable *table, _taos_lru_functor_t functor, void *ud) {
int ret = 0;
uint32_t end = 1 << table->lengthBits;
for (uint32_t i = 0; i < end; ++i) {
SLRUEntry *h = table->list[i];
while (h) {
SLRUEntry *n = h->nextHash;
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(h));
ret = functor(h->keyData, h->keyLength, h->value, ud);
if (ret) {
return ret;
}
h = n;
}
}
return ret;
}
static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)];
while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) {
......@@ -424,7 +444,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash,
void *value, size_t charge, _taos_lru_deleter_t deleter, LRUHandle **handle,
LRUPriority priority) {
LRUPriority priority, void *ud) {
SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen);
if (!e) {
return TAOS_LRU_STATUS_FAIL;
......@@ -433,6 +453,7 @@ static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key,
e->value = value;
e->flags = 0;
e->deleter = deleter;
e->ud = ud;
e->keyLength = keyLen;
e->hash = hash;
e->refs = 0;
......@@ -490,6 +511,18 @@ static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_
}
}
static int taosLRUCacheShardApply(SLRUCacheShard *shard, _taos_lru_functor_t functor, void *ud) {
int ret;
taosThreadMutexLock(&shard->mutex);
ret = taosLRUEntryTableApplyF(&shard->table, functor, ud);
taosThreadMutexUnlock(&shard->mutex);
return ret;
}
static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
......@@ -700,12 +733,12 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
}
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) {
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud) {
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
uint32_t shardIndex = hash & cache->shardedCache.shardMask;
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle,
priority);
priority, ud);
}
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) {
......@@ -722,6 +755,15 @@ void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen) {
return taosLRUCacheShardErase(&cache->shards[shardIndex], key, keyLen, hash);
}
void taosLRUCacheApply(SLRUCache *cache, _taos_lru_functor_t functor, void *ud) {
int numShards = cache->numShards;
for (int i = 0; i < numShards; ++i) {
if (taosLRUCacheShardApply(&cache->shards[i], functor, ud)) {
break;
}
}
}
void taosLRUCacheEraseUnrefEntries(SLRUCache *cache) {
int numShards = cache->numShards;
for (int i = 0; i < numShards; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册