提交 ec4ea9cf 编写于 作者: M Minglei Jin

enh(cache/rocks): base for rocks put/get

上级 c1412648
...@@ -438,6 +438,7 @@ endif(${BUILD_ADDR2LINE}) ...@@ -438,6 +438,7 @@ endif(${BUILD_ADDR2LINE})
# ================================================================================================ # ================================================================================================
# Build test # Build test
# ================================================================================================ # ================================================================================================
message("contrib tests:" ${BUILD_DEPENDENCY_TESTS})
if(${BUILD_DEPENDENCY_TESTS}) if(${BUILD_DEPENDENCY_TESTS})
add_subdirectory(test EXCLUDE_FROM_ALL) add_subdirectory(test EXCLUDE_FROM_ALL)
endif(${BUILD_DEPENDENCY_TESTS}) endif(${BUILD_DEPENDENCY_TESTS})
# rocksdb # rocksdb
message("contrib test dir:" ${BUILD_WITH_ROCKSDB})
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
add_subdirectory(rocksdb) add_subdirectory(rocksdb)
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
......
message("contrib test/rocksdb:" ${BUILD_DEPENDENCY_TESTS})
add_executable(rocksdbTest "") add_executable(rocksdbTest "")
target_sources(rocksdbTest target_sources(rocksdbTest
PRIVATE PRIVATE
......
...@@ -25,10 +25,12 @@ int main(int argc, char const *argv[]) { ...@@ -25,10 +25,12 @@ int main(int argc, char const *argv[]) {
// Read // Read
rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); //rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db));
char buf[256] = {0};
size_t vallen = 0; size_t vallen = 0;
char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err);
printf("val:%s\n", val); snprintf(buf, vallen+5, "val:%s", val);
printf("%ld %ld %s\n", strlen(val), vallen, buf);
// Update // Update
// rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err); // rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err);
......
...@@ -82,6 +82,7 @@ target_include_directories( ...@@ -82,6 +82,7 @@ target_include_directories(
PUBLIC "inc" PUBLIC "inc"
PUBLIC "src/inc" PUBLIC "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
) )
target_link_libraries( target_link_libraries(
vnode vnode
...@@ -98,6 +99,7 @@ target_link_libraries( ...@@ -98,6 +99,7 @@ target_link_libraries(
# PUBLIC bdb # PUBLIC bdb
# PUBLIC scalar # PUBLIC scalar
PUBLIC rocksdb
PUBLIC transport PUBLIC transport
PUBLIC stream PUBLIC stream
PUBLIC index PUBLIC index
......
...@@ -343,6 +343,14 @@ struct STsdbFS { ...@@ -343,6 +343,14 @@ struct STsdbFS {
SArray *aDFileSet; // SArray<SDFileSet> SArray *aDFileSet; // SArray<SDFileSet>
}; };
typedef struct {
rocksdb_t *db;
rocksdb_options_t *options;
rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions;
TdThreadMutex rMutex;
} SRocksCache;
struct STsdb { struct STsdb {
char *path; char *path;
SVnode *pVnode; SVnode *pVnode;
...@@ -355,6 +363,7 @@ struct STsdb { ...@@ -355,6 +363,7 @@ struct STsdb {
TdThreadMutex lruMutex; TdThreadMutex lruMutex;
SLRUCache *biCache; SLRUCache *biCache;
TdThreadMutex biMutex; TdThreadMutex biMutex;
SRocksCache rCache;
}; };
struct TSDBKEY { struct TSDBKEY {
...@@ -796,6 +805,7 @@ typedef struct { ...@@ -796,6 +805,7 @@ typedef struct {
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t uid, TSDBROW *row);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "executor.h" #include "executor.h"
#include "filter.h" #include "filter.h"
#include "qworker.h" #include "qworker.h"
#include "rocksdb/c.h"
#include "sync.h" #include "sync.h"
#include "tRealloc.h" #include "tRealloc.h"
#include "tchecksum.h" #include "tchecksum.h"
......
...@@ -43,6 +43,110 @@ static void tsdbCloseBICache(STsdb *pTsdb) { ...@@ -43,6 +43,110 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
} }
} }
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
SVnode *pVnode = pTsdb->pVnode;
if (pVnode->pTfs) {
if (path) {
snprintf(path, TSDB_FILENAME_LEN, "%s%s%s%scache.rdb", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
pTsdb->path, TD_DIRSEP);
}
} else {
if (path) {
snprintf(path, TSDB_FILENAME_LEN, "%s%scache.rdb", pTsdb->path, TD_DIRSEP);
}
}
}
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
int32_t code = 0;
rocksdb_options_t *options = rocksdb_options_create();
if (NULL == options) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_inplace_update_support(options, 1);
rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
if (NULL == writeoptions) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2;
}
rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
if (NULL == readoptions) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2;
}
char *err = NULL;
char cachePath[TSDB_FILENAME_LEN] = {0};
tsdbGetRocksPath(pTsdb, cachePath);
rocksdb_t *db = rocksdb_open(options, cachePath, &err);
if (NULL == db) {
code = -1;
goto _err3;
}
taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL);
pTsdb->rCache.options = options;
pTsdb->rCache.writeoptions = writeoptions;
pTsdb->rCache.readoptions = readoptions;
pTsdb->rCache.db = db;
return code;
_err4:
rocksdb_readoptions_destroy(readoptions);
_err3:
rocksdb_writeoptions_destroy(writeoptions);
_err2:
rocksdb_options_destroy(options);
_err:
return code;
}
static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_close(pTsdb->rCache.db);
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
rocksdb_options_destroy(pTsdb->rCache.options);
}
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t uid, TSDBROW *pRow) {
int32_t code = 0;
STSDBRowIter iter = {0};
tsdbRowIterOpen(&iter, pRow, pTSchema);
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
SColVal *pRColVal = tsdbCacheGetRColVal(pTsdb);
if (pRColVal) {
// merge pColVal with pRColVal
}
tsdbCachePutRColVal(pColVal);
}
tsdbRowClose(&iter);
char *err = NULL;
char buf[256] = {0};
size_t vallen = 0;
char *val = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, "key", 3, &vallen, &err);
if (val) {
} else {
}
rocksdb_put(pTsdb->rCache.db, pTsdb->rCache.writeoptions, "key", 3, "value", 5, &err);
return code;
}
int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
SLRUCache *pCache = NULL; SLRUCache *pCache = NULL;
...@@ -60,6 +164,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { ...@@ -60,6 +164,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
goto _err; goto _err;
} }
code = tsdbOpenRocksCache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false); taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->lruMutex, NULL); taosThreadMutexInit(&pTsdb->lruMutex, NULL);
...@@ -80,6 +190,7 @@ void tsdbCloseCache(STsdb *pTsdb) { ...@@ -80,6 +190,7 @@ void tsdbCloseCache(STsdb *pTsdb) {
} }
tsdbCloseBICache(pTsdb); tsdbCloseBICache(pTsdb);
tsdbCloseRocksCache(pTsdb);
} }
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
......
...@@ -298,12 +298,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) { ...@@ -298,12 +298,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) {
return rowsNum; return rowsNum;
} }
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) { void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch); taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) { for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i]; STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) { while (pTbData) {
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); void *p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) { if (p == NULL) {
pTbData = pTbData->next; pTbData = pTbData->next;
continue; continue;
...@@ -668,15 +668,17 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -668,15 +668,17 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
if (key.ts >= pTbData->maxKey) { if (key.ts >= pTbData->maxKey) {
pTbData->maxKey = key.ts; pTbData->maxKey = key.ts;
/*
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) { if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true); tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true);
}*/
} }
} /*
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb); tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
} }
*/
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->uid, &lRow);
// SMemTable // SMemTable
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
...@@ -736,15 +738,17 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -736,15 +738,17 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
if (key.ts >= pTbData->maxKey) { if (key.ts >= pTbData->maxKey) {
pTbData->maxKey = key.ts; pTbData->maxKey = key.ts;
/*
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) { if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true); tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true);
}*/
} }
} /*
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb); tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
} }
*/
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->uid, &lRow);
// SMemTable // SMemTable
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册