提交 7fab74da 编写于 作者: S shenglian zhou

enhance: add tsdb stt block data cache

上级 95a18a3c
...@@ -376,6 +376,8 @@ struct STsdb { ...@@ -376,6 +376,8 @@ struct STsdb {
SLRUCache *biCache; SLRUCache *biCache;
TdThreadMutex biMutex; TdThreadMutex biMutex;
SRocksCache rCache; SRocksCache rCache;
SLRUCache* sttBlockCache;
SLRUCache* sttBlkCache;
}; };
struct TSDBKEY { struct TSDBKEY {
...@@ -705,6 +707,7 @@ typedef struct SSttBlockLoadInfo { ...@@ -705,6 +707,7 @@ typedef struct SSttBlockLoadInfo {
SArray *aSttBlk; SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex; int32_t currentLoadBlockIndex;
LRUHandle *blockDataHandle;
int32_t loadBlocks; int32_t loadBlocks;
double elapsedTime; double elapsedTime;
STSchema *pSchema; STSchema *pSchema;
......
...@@ -1210,12 +1210,15 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { ...@@ -1210,12 +1210,15 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
pTsdb->flushState.pTsdb = pTsdb; pTsdb->flushState.pTsdb = pTsdb;
pTsdb->flushState.flush_count = 0; pTsdb->flushState.flush_count = 0;
pTsdb->sttBlockCache = taosLRUCacheInit(128 * 1024 * 1024, -1, 0.0);
pTsdb->sttBlkCache = taosLRUCacheInit(4 * 1024 * 1024, -1, 0.0);
_err: _err:
pTsdb->lruCache = pCache; pTsdb->lruCache = pCache;
return code; return code;
} }
void tsdbCloseCache(STsdb *pTsdb) { void tsdbCloseCache(STsdb *pTsdb) {
taosLRUCacheCleanup(pTsdb->sttBlockCache);
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
if (pCache) { if (pCache) {
taosLRUCacheEraseUnrefEntries(pCache); taosLRUCacheEraseUnrefEntries(pCache);
......
...@@ -90,11 +90,52 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { ...@@ -90,11 +90,52 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
taosMemoryFree(pLoadInfo); taosMemoryFree(pLoadInfo);
return NULL; return NULL;
} }
static void deleteSttBlockDataCache(const void *key, size_t keyLen, void *value, void *ud) {
SBlockData* pBlockData = value;
tBlockDataDestroy(pBlockData);
}
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
int32_t code = 0;
// (pReader->pSet->fid, iStt, pReader->pSet->pHeadF->commitID) -> aSttBlk global cache
// (pIter->pReader->pSet->fid, pIter->iStt, pIter->pReader->pSet->pHeadF->commitID, pIter->pSttBlk->bInfo.offset) -> SBlockData
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo; SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
pInfo->blockDataHandle = NULL;
return NULL;
}
struct {
int32_t fid;
int32_t iStt;
int64_t cid;
int64_t offset;
} key = {.fid = pIter->pReader->pSet->fid, .iStt = pIter->iStt, .cid = pIter->pReader->pSet->pHeadF->commitID, .offset = pIter->pSttBlk->bInfo.offset};
int32_t code = 0;
LRUHandle* h = taosLRUCacheLookup(pIter->pReader->pTsdb->sttBlockCache, &key, sizeof(key));
if (!h) {
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
SBlockData* pBlockData = taosMemoryMalloc(sizeof(SBlockData));
TABLEID id = {0};
if (pIter->pSttBlk->suid != 0) {
id.suid = pIter->pSttBlk->suid;
} else {
id.uid = pIter->uid;
}
tBlockDataInit(pBlockData, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
int charge = 2 * 1024 * 1024; //TODO
taosLRUCacheInsert(pIter->pReader->pTsdb->sttBlockCache, &key, sizeof(key), pBlockData, charge, deleteSttBlockDataCache, &h, TAOS_LRU_PRIORITY_LOW, NULL);
}
SBlockData* pBlockData = taosLRUCacheValue(pIter->pReader->pTsdb->sttBlockCache, h);
pInfo->blockDataHandle = h;
return pBlockData;
if (pInfo->blockIndex[0] == pIter->iSttBlk) { if (pInfo->blockIndex[0] == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 0) { if (pInfo->currentLoadBlockIndex != 0) {
tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s", tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
...@@ -271,7 +312,7 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t ...@@ -271,7 +312,7 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t
if (!pBlockLoadInfo->sttBlockLoaded) { if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
pBlockLoadInfo->sttBlockLoaded = true; pBlockLoadInfo->sttBlockLoaded = true;
// (pReader->pSet->fid, iStt, pReader->pSet->pHeadF->commitID, iStt) -> aSttBlk global cache
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk); code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
if (code) { if (code) {
return code; return code;
...@@ -512,6 +553,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { ...@@ -512,6 +553,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
} }
if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) { if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
taosLRUCacheRelease(pIter->pReader->pTsdb->sttBlockCache, pIter->pBlockLoadInfo->blockDataHandle, false);
tLDataIterNextBlock(pIter, idStr); tLDataIterNextBlock(pIter, idStr);
if (pIter->pSttBlk == NULL) { // no more data if (pIter->pSttBlk == NULL) { // no more data
goto _exit; goto _exit;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册