提交 158ae209 编写于 作者: M Minglei Jin

cache/lru: new applyF API for tlrucache

上级 6d23ea93
......@@ -24,7 +24,8 @@ extern "C" {
typedef struct SLRUCache SLRUCache;
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value);
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value, void *ud);
typedef int (*_taos_lru_functor_t)(const void *key, size_t keyLen, void *value, void *ud);
typedef struct LRUHandle LRUHandle;
......@@ -41,10 +42,11 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
void taosLRUCacheCleanup(SLRUCache *cache);
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority);
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud);
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen);
void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen);
void taosLRUCacheApply(SLRUCache *cache, _taos_lru_functor_t functor, void *ud);
void taosLRUCacheEraseUnrefEntries(SLRUCache *cache);
bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle);
......
......@@ -357,6 +357,11 @@ typedef struct {
STSchema *pTSchema;
} SRocksCache;
typedef struct {
STsdb *pTsdb;
int flush_count;
} SCacheFlushState;
struct STsdb {
char *path;
SVnode *pVnode;
......@@ -366,6 +371,7 @@ struct STsdb {
SMemTable *imem;
STsdbFS fs;
SLRUCache *lruCache;
SCacheFlushState flushState;
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
......
......@@ -151,7 +151,6 @@ int32_t metaCacheOpen(SMeta* pMeta) {
taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);
pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
if (pCache->STbGroupResCache.pResCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -169,7 +168,6 @@ int32_t metaCacheOpen(SMeta* pMeta) {
taosHashSetFreeFp(pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
taosThreadMutexInit(&pCache->STbGroupResCache.lock, NULL);
pMeta->pCache = pCache;
return code;
......@@ -486,14 +484,14 @@ static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInv
}
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
// ASSERT(keyLen == sizeof(int64_t) * 2);
// ASSERT(keyLen == sizeof(int64_t) * 2);
memcpy(&pBuf[2], key, keyLen);
}
// the format of key:
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
buf[0] = (uint64_t) pHashMap;
buf[0] = (uint64_t)pHashMap;
buf[1] = suid;
setMD5DigestInKey(buf, key, keyLen);
ASSERT(keyLen == sizeof(uint64_t) * 2);
......@@ -541,7 +539,8 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
((double)(*pEntry)->hitTimes) / acc);
}
taosLRUCacheRelease(pCache, pHandle, false);
......@@ -551,7 +550,8 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
return TSDB_CODE_SUCCESS;
}
static void freeUidCachePayload(const void* key, size_t keyLen, void* value) {
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
if (value == NULL) {
return;
}
......@@ -659,7 +659,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
// add to cache.
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
......@@ -675,7 +675,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
uint64_t dummy[2] = {0};
initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16);
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
taosThreadMutexLock(pLock);
......@@ -700,7 +700,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%"PRId64" cached related tag filter uid list cleared", vgId, suid);
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
return TSDB_CODE_SUCCESS;
}
......@@ -738,7 +738,8 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
((double)(*pEntry)->hitTimes) / acc);
}
taosLRUCacheRelease(pCache, pHandle, false);
......@@ -748,8 +749,8 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
return TSDB_CODE_SUCCESS;
}
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value) {
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
if (value == NULL) {
return;
}
......@@ -778,8 +779,8 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value)
taosMemoryFree(tmp);
double el = (taosGetTimestampUs() - st) / 1000.0;
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
el);
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
listNEles(&((*pEntry)->list)), el);
break;
}
}
......@@ -788,7 +789,6 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value)
taosArrayDestroy((SArray*)value);
}
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen) {
int32_t code = 0;
......@@ -836,7 +836,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
// add to cache.
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
......@@ -852,7 +852,7 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
uint64_t dummy[2] = {0};
initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16);
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
taosThreadMutexLock(pLock);
......@@ -877,8 +877,6 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%"PRId64" cached related tb group cleared", vgId, suid);
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
return TSDB_CODE_SUCCESS;
}
......@@ -14,6 +14,8 @@
*/
#include "tsdb.h"
#define ROCKS_BATCH_SIZE (4096)
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
......@@ -226,7 +228,7 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
wb = pTsdb->rCache.writebatch;
}
int count = rocksdb_writebatch_count(wb);
if ((force && count > 0) || count >= 1024) {
if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
......@@ -244,23 +246,7 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
}
}
int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0;
char *err = NULL;
rocksMayWrite(pTsdb, true, false, true);
rocksMayWrite(pTsdb, true, true, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
code = -1;
}
return code;
}
SLastCol *tsdbCacheDeserialize(char const *value) {
static SLastCol *tsdbCacheDeserialize(char const *value) {
if (!value) {
return NULL;
}
......@@ -278,7 +264,7 @@ SLastCol *tsdbCacheDeserialize(char const *value) {
return pLastCol;
}
void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
SColVal *pColVal = &pLastCol->colVal;
size_t length = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pColVal->type)) {
......@@ -300,6 +286,68 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
*size = length;
}
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) {
SCacheFlushState *state = (SCacheFlushState *)ud;
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosMemoryFree(rocks_value);
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
state->flush_count = 0;
}
pLastCol->dirty = 0;
}
return 0;
}
int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0;
char *err = NULL;
SLRUCache *pCache = pTsdb->lruCache;
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
taosThreadMutexLock(&pTsdb->lruMutex);
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, wb);
rocksMayWrite(pTsdb, true, false, true);
rocksMayWrite(pTsdb, true, true, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
taosThreadMutexUnlock(&pTsdb->lruMutex);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
code = -1;
}
return code;
}
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
SLastCol *pLastCol = NULL;
......@@ -329,19 +377,44 @@ static void reallocVarData(SColVal *pColVal) {
}
}
static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) {
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
(void)key;
(void)klen;
SLastCol *pLastCol = (SLastCol *)value;
// TODO: add dirty flag to SLastCol
if (pLastCol->dirty) {
// TODO: queue into dirty list, free it after save to backstore
} else {
SCacheFlushState *state = (SCacheFlushState *)ud;
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosMemoryFree(rocks_value);
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
state->flush_count = 0;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) {
taosMemoryFree(pLastCol->colVal.value.pData);
}
taosMemoryFree(value);
}
}
typedef struct {
......@@ -412,12 +485,15 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
}
}
pLastCol->dirty = 1;
/*
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
// tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
*/
}
taosLRUCacheRelease(pCache, h, false);
......@@ -454,11 +530,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
}
}
pLastCol->dirty = 1;
/*
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
*/
}
taosLRUCacheRelease(pCache, h, false);
......@@ -523,7 +602,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge,
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW);
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -551,7 +630,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge,
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW);
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -800,7 +879,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
}
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW);
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -875,7 +954,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
}
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW);
NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -994,7 +1073,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
}
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
TAOS_LRU_PRIORITY_LOW);
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -1150,7 +1229,8 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
*len = sizeof(uint64_t);
}
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) {
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value, void *ud) {
(void)ud;
SArray *pLastArray = (SArray *)value;
int16_t nCol = taosArrayGetSize(pLastArray);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
......@@ -3234,7 +3314,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -3274,7 +3355,7 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr,
size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -3347,7 +3428,8 @@ static int32_t tsdbCacheLoadBlockIdx(SDataFReader *pFileReader, SArray **aBlockI
return code;
}
static void deleteBICache(const void *key, size_t keyLen, void *value) {
static void deleteBICache(const void *key, size_t keyLen, void *value, void *ud) {
(void)ud;
SArray *pArray = (SArray *)value;
taosArrayDestroy(pArray);
......@@ -3378,7 +3460,8 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteBICache;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......
......@@ -466,7 +466,12 @@ static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
}
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value, void* ud) {
(void)key;
(void)keyLen;
(void)ud;
freeTableCachedVal(value);
}
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
for (int32_t j = 0; j < numOfExpr; ++j) {
......@@ -554,7 +559,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
freeReader = true;
int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
if (ret != TAOS_LRU_STATUS_OK) {
qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
freeTableCachedVal(pVal);
......
......@@ -29,7 +29,10 @@ typedef struct {
char buf[0];
} SDataBlock;
static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
taosMemoryFree(value);
}
static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
char* p = buf;
......@@ -136,7 +139,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
memcpy(buf + total, blk->buf + blkOffset, nread);
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
TAOS_LRU_PRIORITY_LOW);
TAOS_LRU_PRIORITY_LOW, NULL);
if (s != TAOS_LRU_STATUS_OK) {
return -1;
}
......
......@@ -37,7 +37,6 @@
#include "syncVoteMgr.h"
#include "tglobal.h"
#include "tref.h"
#include "syncUtil.h"
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
......@@ -141,7 +140,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return -1;
if(pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex){
if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
syncNodeRelease(pSyncNode);
sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
......@@ -323,8 +322,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
}
if (pSyncNode->totalReplicaNum > 1) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER
&& pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
lastApplyIndex);
syncNodeRelease(pSyncNode);
......@@ -544,7 +543,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
if (pSyncNode == NULL) return;
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
if(pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
SEp* pEp = &pEpSet->eps[i];
tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
......@@ -579,18 +578,16 @@ int32_t syncIsCatchUp(int64_t rid) {
}
int32_t isCatchUp = 0;
if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){
pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
isCatchUp = 0;
}
else{
sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
} else {
sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
isCatchUp = 1;
}
......@@ -802,8 +799,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->vgId = pSyncInfo->vgId;
SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
bool updated = false;
sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d",
pSyncNode->vgId, pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
SNodeInfo* pNode = &pCfg->nodeInfo[i];
if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
......@@ -1110,10 +1107,9 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
int32_t syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if(pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER){
if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
syncNodeBecomeLearner(pSyncNode, "first start");
}
else{
} else {
if (pSyncNode->replicaNum == 1) {
raftStoreNextTerm(pSyncNode);
syncNodeBecomeLeader(pSyncNode, "one replica start");
......@@ -1438,7 +1434,7 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg
const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
if(pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
}
return false;
......@@ -1476,9 +1472,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// log begin config change
sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
pSyncNode->vgId,
oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum,
oldConfig.lastIndex, pNewConfig->lastIndex);
pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
pNewConfig->lastIndex);
if (IamInNew) {
pSyncNode->raftCfg.isStandBy = 0; // change isStandBy to normal
......@@ -2234,7 +2229,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
syncNodeRelease(pSyncNode);
}
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
taosMemoryFree(value);
}
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
SSyncLogStoreData* pData = pLogStore->data;
......@@ -2243,7 +2241,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
int32_t code = 0;
int32_t entryLen = sizeof(*pEntry) + pEntry->dataLen;
LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -2409,11 +2407,10 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs;
sTrace(
"vgId:%d, heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
sTrace("vgId:%d, heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId,
DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
if(pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER){
if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
raftStoreSetTerm(ths, pMsg->term);
currentTerm = pMsg->term;
}
......
......@@ -39,6 +39,7 @@ enum {
struct SLRUEntry {
void *value;
_taos_lru_deleter_t deleter;
void *ud;
SLRUEntry *nextHash;
SLRUEntry *next;
SLRUEntry *prev;
......@@ -94,7 +95,7 @@ static void taosLRUEntryFree(SLRUEntry *entry) {
ASSERT(entry->refs == 0);
if (entry->deleter) {
(*entry->deleter)(entry->keyData, entry->keyLength, entry->value);
(*entry->deleter)(entry->keyData, entry->keyLength, entry->value, entry->ud);
}
taosMemoryFree(entry);
......@@ -146,6 +147,25 @@ static void taosLRUEntryTableCleanup(SLRUEntryTable *table) {
taosMemoryFree(table->list);
}
static int taosLRUEntryTableApplyF(SLRUEntryTable *table, _taos_lru_functor_t functor, void *ud) {
int ret = 0;
uint32_t end = 1 << table->lengthBits;
for (uint32_t i = 0; i < end; ++i) {
SLRUEntry *h = table->list[i];
while (h) {
SLRUEntry *n = h->nextHash;
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(h));
ret = functor(h->keyData, h->keyLength, h->value, ud);
if (!ret) {
return ret;
}
h = n;
}
}
return ret;
}
static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)];
while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) {
......@@ -424,7 +444,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash,
void *value, size_t charge, _taos_lru_deleter_t deleter, LRUHandle **handle,
LRUPriority priority) {
LRUPriority priority, void *ud) {
SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen);
if (!e) {
return TAOS_LRU_STATUS_FAIL;
......@@ -433,6 +453,7 @@ static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key,
e->value = value;
e->flags = 0;
e->deleter = deleter;
e->ud = ud;
e->keyLength = keyLen;
e->hash = hash;
e->refs = 0;
......@@ -490,6 +511,18 @@ static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_
}
}
static int taosLRUCacheShardApply(SLRUCacheShard *shard, _taos_lru_functor_t functor, void *ud) {
int ret;
taosThreadMutexLock(&shard->mutex);
ret = taosLRUEntryTableApplyF(&shard->table, functor, ud);
taosThreadMutexUnlock(&shard->mutex);
return ret;
}
static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
......@@ -700,12 +733,12 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
}
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) {
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud) {
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
uint32_t shardIndex = hash & cache->shardedCache.shardMask;
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle,
priority);
priority, ud);
}
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) {
......@@ -722,6 +755,15 @@ void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen) {
return taosLRUCacheShardErase(&cache->shards[shardIndex], key, keyLen, hash);
}
void taosLRUCacheApply(SLRUCache *cache, _taos_lru_functor_t functor, void *ud) {
int numShards = cache->numShards;
for (int i = 0; i < numShards; ++i) {
if (0 != taosLRUCacheShardApply(&cache->shards[i], functor, ud)) {
break;
}
}
}
void taosLRUCacheEraseUnrefEntries(SLRUCache *cache) {
int numShards = cache->numShards;
for (int i = 0; i < numShards; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册