diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 8f943133820bfa45a963ad9c36c48ddf60756a98..6cd6f79d8b34dc1bb411bd9a0383dc7ab51ef896 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -438,6 +438,7 @@ endif(${BUILD_ADDR2LINE}) # ================================================================================================ # Build test # ================================================================================================ +message("contrib tests:" ${BUILD_DEPENDENCY_TESTS}) if(${BUILD_DEPENDENCY_TESTS}) add_subdirectory(test EXCLUDE_FROM_ALL) endif(${BUILD_DEPENDENCY_TESTS}) diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index f35cf0d13d7078e6e90f52f3b49a0c579f3d856b..911eca50a7b1ea6f26b4e5cbec50752d844c8820 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -1,4 +1,6 @@ # rocksdb +message("contrib test dir:" ${BUILD_WITH_ROCKSDB}) + if(${BUILD_WITH_ROCKSDB}) add_subdirectory(rocksdb) endif(${BUILD_WITH_ROCKSDB}) diff --git a/contrib/test/rocksdb/CMakeLists.txt b/contrib/test/rocksdb/CMakeLists.txt index b500e0a66158ba33d52d351f4e34462b27771892..5ae1b0395b5c2a4b323c07de19d1298579cfcc3f 100644 --- a/contrib/test/rocksdb/CMakeLists.txt +++ b/contrib/test/rocksdb/CMakeLists.txt @@ -1,6 +1,8 @@ +message("contrib test/rocksdb:" ${BUILD_DEPENDENCY_TESTS}) + add_executable(rocksdbTest "") target_sources(rocksdbTest PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/main.c" ) -target_link_libraries(rocksdbTest rocksdb) \ No newline at end of file +target_link_libraries(rocksdbTest rocksdb) diff --git a/contrib/test/rocksdb/main.c b/contrib/test/rocksdb/main.c index d1cbd373da1f8af09b6f46fbfbeb9da27dd43678..09435790cc3f1b0a05b0fe3db73e708243cfaa8f 100644 --- a/contrib/test/rocksdb/main.c +++ b/contrib/test/rocksdb/main.c @@ -25,10 +25,12 @@ int main(int argc, char const *argv[]) { // Read rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); - rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); +//rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); + char buf[256] = {0}; size_t vallen = 0; char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); - printf("val:%s\n", val); + snprintf(buf, vallen+5, "val:%s", val); + printf("%ld %ld %s\n", strlen(val), vallen, buf); // Update // rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err); @@ -43,4 +45,4 @@ int main(int argc, char const *argv[]) { rocksdb_close(db); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 9911752f8e23827b7f3700118d499601569e5980..52df20877a68ccde0c43e54de4c8e5b45a31a791 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -82,6 +82,7 @@ target_include_directories( PUBLIC "inc" PUBLIC "src/inc" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" + PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" ) target_link_libraries( vnode @@ -98,6 +99,7 @@ target_link_libraries( # PUBLIC bdb # PUBLIC scalar + PUBLIC rocksdb PUBLIC transport PUBLIC stream PUBLIC index diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2a85b191a42839a580da6ff8b65ee4e9c65430b5..b698909af1268bf5e6c216ab0324b45a7479e9ca 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -343,6 +343,14 @@ struct STsdbFS { SArray *aDFileSet; // SArray }; +typedef struct { + rocksdb_t *db; + rocksdb_options_t *options; + rocksdb_writeoptions_t *writeoptions; + rocksdb_readoptions_t *readoptions; + TdThreadMutex rMutex; +} SRocksCache; + struct STsdb { char *path; SVnode *pVnode; @@ -355,6 +363,7 @@ struct STsdb { TdThreadMutex lruMutex; SLRUCache *biCache; TdThreadMutex biMutex; + SRocksCache rCache; }; struct TSDBKEY { @@ -796,6 +805,7 @@ typedef struct { int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); +int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t uid, TSDBROW *row); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 253d5aebcec2353c1e951c5399256a23681f6850..a3f2c4559f94e36efd68bca259bcfee85650774c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -19,6 +19,7 @@ #include "executor.h" #include "filter.h" #include "qworker.h" +#include "rocksdb/c.h" #include "sync.h" #include "tRealloc.h" #include "tchecksum.h" diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 48d33712845503aa8b69911dff837a81968ea319..171a63aa33685f7bb98cb77bf5eaabe5faf244fb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -43,6 +43,110 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } +static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { + SVnode *pVnode = pTsdb->pVnode; + if (pVnode->pTfs) { + if (path) { + snprintf(path, TSDB_FILENAME_LEN, "%s%s%s%scache.rdb", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + } + } else { + if (path) { + snprintf(path, TSDB_FILENAME_LEN, "%s%scache.rdb", pTsdb->path, TD_DIRSEP); + } + } +} + +static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { + int32_t code = 0; + + rocksdb_options_t *options = rocksdb_options_create(); + if (NULL == options) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + rocksdb_options_set_create_if_missing(options, 1); + rocksdb_options_set_inplace_update_support(options, 1); + rocksdb_options_set_allow_concurrent_memtable_write(options, 0); + + rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); + if (NULL == writeoptions) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err2; + } + + rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); + if (NULL == readoptions) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err2; + } + + char *err = NULL; + char cachePath[TSDB_FILENAME_LEN] = {0}; + tsdbGetRocksPath(pTsdb, cachePath); + + rocksdb_t *db = rocksdb_open(options, cachePath, &err); + if (NULL == db) { + code = -1; + goto _err3; + } + + taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); + + pTsdb->rCache.options = options; + pTsdb->rCache.writeoptions = writeoptions; + pTsdb->rCache.readoptions = readoptions; + pTsdb->rCache.db = db; + + return code; + +_err4: + rocksdb_readoptions_destroy(readoptions); +_err3: + rocksdb_writeoptions_destroy(writeoptions); +_err2: + rocksdb_options_destroy(options); +_err: + return code; +} + +static void tsdbCloseRocksCache(STsdb *pTsdb) { + rocksdb_close(pTsdb->rCache.db); + rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); + rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); + rocksdb_options_destroy(pTsdb->rCache.options); +} + +int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t uid, TSDBROW *pRow) { + int32_t code = 0; + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, pRow, pTSchema); + + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { + SColVal *pRColVal = tsdbCacheGetRColVal(pTsdb); + if (pRColVal) { + // merge pColVal with pRColVal + } + + tsdbCachePutRColVal(pColVal); + } + + tsdbRowClose(&iter); + + char *err = NULL; + char buf[256] = {0}; + size_t vallen = 0; + char *val = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, "key", 3, &vallen, &err); + if (val) { + } else { + } + rocksdb_put(pTsdb->rCache.db, pTsdb->rCache.writeoptions, "key", 3, "value", 5, &err); + + return code; +} + int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t code = 0; SLRUCache *pCache = NULL; @@ -60,6 +164,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } + code = tsdbOpenRocksCache(pTsdb); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + taosLRUCacheSetStrictCapacity(pCache, false); taosThreadMutexInit(&pTsdb->lruMutex, NULL); @@ -80,6 +190,7 @@ void tsdbCloseCache(STsdb *pTsdb) { } tsdbCloseBICache(pTsdb); + tsdbCloseRocksCache(pTsdb); } static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d0ff403bf70a96f114b0722bfb13f7ce035edfb4..f480fb6081d78305c3215f330fbcce03f87bf5bd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -284,8 +284,8 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { int64_t tsdbCountTbDataRows(STbData *pTbData) { SMemSkipListNode *pNode = pTbData->sl.pHead; - int64_t rowsNum = 0; - + int64_t rowsNum = 0; + while (NULL != pNode) { pNode = SL_GET_NODE_FORWARD(pNode, 0); if (pNode == pTbData->sl.pTail) { @@ -298,17 +298,17 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) { return rowsNum; } -void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) { +void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum) { taosRLockLatch(&pMemTable->latch); for (int32_t i = 0; i < pMemTable->nBucket; ++i) { STbData *pTbData = pMemTable->aBucket[i]; while (pTbData) { - void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); + void *p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); if (p == NULL) { pTbData = pTbData->next; continue; } - + *rowsNum += tsdbCountTbDataRows(pTbData); pTbData = pTbData->next; } @@ -668,15 +668,17 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.ts >= pTbData->maxKey) { pTbData->maxKey = key.ts; - + /* if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) { tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true); - } + }*/ } - + /* if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb); } + */ + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->uid, &lRow); // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); @@ -736,15 +738,17 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.ts >= pTbData->maxKey) { pTbData->maxKey = key.ts; - + /* if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) { tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true); - } + }*/ } - + /* if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb); } + */ + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->uid, &lRow); // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);