diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 6103262e1775e94a780901f2764984a5e913b834..7c4aab2491f411bd1196ea8b425d85faf9082360 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -38,6 +38,8 @@ typedef struct STdbState { rocksdb_comparator_t** pCompare; rocksdb_options_t* dbOpt; struct SStreamTask* pOwner; + void* param; + void* env; TDB* db; TTB* pStateDb; diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 136e54fd94bb64853403bd551232bd96ad2525e1..5e8c406697612f5bf8a1145519b7cdc38c9200f6 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "tcommon.h" #include "tlog.h" @@ -280,6 +281,10 @@ int parKeyToString(void* k, char* buf) { return n; } +typedef struct { + void* tableOpt; + void* lru; // global or not +} rocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; typedef int (*EncodeFunc)(void* key, char* buf); @@ -354,6 +359,7 @@ int streamInitBackend(SStreamState* pState, char* path) { char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + rocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(rocksdbCfParam)); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); for (int i = 0; i < cfLen; i++) { cfOpt[i] = rocksdb_options_create_copy(opts); @@ -367,6 +373,8 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); + param[i].tableOpt = tableOpt; + param[i].lru = cache; // rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8); // rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans); }; @@ -391,6 +399,8 @@ int streamInitBackend(SStreamState* pState, char* path) { pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; pState->pTdbState->pCompare = pCompare; pState->pTdbState->dbOpt = opts; + pState->pTdbState->param = param; + pState->pTdbState->env = env; return 0; } void streamCleanBackend(SStreamState* pState) { @@ -398,12 +408,17 @@ void streamCleanBackend(SStreamState* pState) { qInfo("rocksdb already free"); return; } - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + rocksdbCfParam* param = pState->pTdbState->param; for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); + + rocksdb_cache_destroy(param[i].lru); + rocksdb_block_based_options_destroy(param[i].tableOpt); } + taosMemoryFree(pState->pTdbState->param); rocksdb_options_destroy(pState->pTdbState->dbOpt); taosMemoryFreeClear(pState->pTdbState->pHandle); @@ -417,6 +432,7 @@ void streamCleanBackend(SStreamState* pState) { pState->pTdbState->readOpts = NULL; rocksdb_close(pState->pTdbState->rocksdb); + rocksdb_env_destroy(pState->pTdbState->env); pState->pTdbState->rocksdb = NULL; }