提交 a55f40a1 编写于 作者: M Minglei Jin

feat/tsdbCache: the first round tsdbCache

上级 4cacd72f
...@@ -41,6 +41,7 @@ target_sources( ...@@ -41,6 +41,7 @@ target_sources(
"src/tsdb/tsdbMemTable.c" "src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbRead.c" "src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbCache.c"
"src/tsdb/tsdbWrite.c" "src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbReaderWriter.c" "src/tsdb/tsdbReaderWriter.c"
"src/tsdb/tsdbUtil.c" "src/tsdb/tsdbUtil.c"
......
...@@ -198,6 +198,13 @@ int32_t tsdbDelFReaderClose(SDelFReader *pReader); ...@@ -198,6 +198,13 @@ int32_t tsdbDelFReaderClose(SDelFReader *pReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDataMap, uint8_t **ppBuf); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDataMap, uint8_t **ppBuf);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppBuf); int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppBuf);
// tsdbCache
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid);
// structs ======================= // structs =======================
typedef struct { typedef struct {
int minFid; int minFid;
...@@ -218,6 +225,7 @@ struct STsdb { ...@@ -218,6 +225,7 @@ struct STsdb {
SMemTable *imem; SMemTable *imem;
SRtn rtn; SRtn rtn;
STsdbFS *fs; STsdbFS *fs;
SLRUCache *lruCache;
}; };
struct STable { struct STable {
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include "tmallocator.h" #include "tmallocator.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tlrucache.h"
#include "tstream.h" #include "tstream.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = NULL;
size_t cfgCapacity = 1024 * 1024; // TODO: get cfg from tsdb config
pCache = taosLRUCacheInit(cfgCapacity, -1, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, true);
_err:
pTsdb->lruCache = pCache;
return code;
}
void tsdbCloseCache(SLRUCache *pCache) {
if (pCache) {
taosLRUCacheEraseUnrefEntries(pCache);
taosLRUCacheCleanup(pCache);
}
}
static void getTableCacheKey(tb_uid_t uid, const char *cacheType, char *key, int *len) {
int keyLen = 0;
snprintf(key, 30, "%"PRIi64 "%s", uid, cacheType);
*len = strlen(key);
}
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) {
taosMemoryFree(value);
}
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
int32_t code = 0;
STSRow *cacheRow = NULL;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
cacheRow = (STSRow *) taosLRUCacheValue(pCache, h);
if (row->ts >= cacheRow->ts) {
if (row->ts > cacheRow->ts) {
tdRowCpy(cacheRow, row);
}
}
} else {
cacheRow = tdRowDup(row);
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow),
deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
return code;
}
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
*ppRow = (STSRow *) taosLRUCacheValue(pCache, h);
} else {
// TODO: load lastrow from mem, imem, and files
// if table's empty, return code of -1
code = -1;
}
return code;
}
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
taosLRUCacheRelease(pCache, h, true);
//void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
...@@ -143,6 +143,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -143,6 +143,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
SMemTable *pMemTable = pTsdb->mem; SMemTable *pMemTable = pTsdb->mem;
STbData *pTbData = NULL; STbData *pTbData = NULL;
SVBufPool *pPool = pTsdb->pVnode->inUse; SVBufPool *pPool = pTsdb->pVnode->inUse;
TSDBKEY lastKey = {.version = version, .ts = eKey};
// check if table exists (todo) // check if table exists (todo)
...@@ -173,6 +174,10 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -173,6 +174,10 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDel++; pMemTable->nDel++;
if (tsdbKeyCmprFn(&lastKey, &pTbData->info.maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid);
}
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" since %s", " since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
...@@ -496,6 +501,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i ...@@ -496,6 +501,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
SMemSkipListNode *pos[SL_MAX_LEVEL]; SMemSkipListNode *pos[SL_MAX_LEVEL];
TSDBROW row = tsdbRowFromTSRow(version, NULL); TSDBROW row = tsdbRowFromTSRow(version, NULL);
int32_t nRow = 0; int32_t nRow = 0;
STSRow *pLastRow = NULL;
tInitSubmitBlkIter(pMsgIter, pBlock, &blkIter); tInitSubmitBlkIter(pMsgIter, pBlock, &blkIter);
...@@ -517,6 +523,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i ...@@ -517,6 +523,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
pMemTable->info.minKey = key; pMemTable->info.minKey = key;
} }
pLastRow = row.pTSRow;
// forward put rest data // forward put rest data
row.pTSRow = tGetSubmitBlkNext(&blkIter); row.pTSRow = tGetSubmitBlkNext(&blkIter);
if (row.pTSRow) { if (row.pTSRow) {
...@@ -532,12 +540,18 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i ...@@ -532,12 +540,18 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
goto _err; goto _err;
} }
pLastRow = row.pTSRow;
row.pTSRow = tGetSubmitBlkNext(&blkIter); row.pTSRow = tGetSubmitBlkNext(&blkIter);
} while (row.pTSRow); } while (row.pTSRow);
} }
if (tsdbKeyCmprFn(&key, &pTbData->info.maxKey) > 0) { if (tsdbKeyCmprFn(&key, &pTbData->info.maxKey) > 0) {
pTbData->info.maxKey = key; pTbData->info.maxKey = key;
if (pLastRow) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow);
}
} }
if (tsdbKeyCmprFn(&key, &pMemTable->info.maxKey) > 0) { if (tsdbKeyCmprFn(&key, &pMemTable->info.maxKey) > 0) {
......
...@@ -73,6 +73,10 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee ...@@ -73,6 +73,10 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
goto _err; goto _err;
} }
if (tsdbOpenCache(pTsdb) < 0) {
goto _err;
}
tsdbDebug("vgId:%d, tsdb is opened for %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, tsdbDebug("vgId:%d, tsdb is opened for %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2); pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
...@@ -91,6 +95,7 @@ int tsdbClose(STsdb **pTsdb) { ...@@ -91,6 +95,7 @@ int tsdbClose(STsdb **pTsdb) {
tsdbFSClose((*pTsdb)->fs); tsdbFSClose((*pTsdb)->fs);
// tsdbFreeFS((*pTsdb)->fs); // tsdbFreeFS((*pTsdb)->fs);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);
tsdbCloseCache((*pTsdb)->lruCache);
} }
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册