From a3836b2363958598b0373403c4c1514fc3b9e704 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Apr 2023 12:52:03 +0000 Subject: [PATCH] fix invalid free --- include/libs/stream/streamState.h | 2 ++ source/libs/stream/src/streamStateRocksdb.c | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 6103262e17..7c4aab2491 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -38,6 +38,8 @@ typedef struct STdbState { rocksdb_comparator_t** pCompare; rocksdb_options_t* dbOpt; struct SStreamTask* pOwner; + void* param; + void* env; TDB* db; TTB* pStateDb; diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 136e54fd94..5e8c406697 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "tcommon.h" #include "tlog.h" @@ -280,6 +281,10 @@ int parKeyToString(void* k, char* buf) { return n; } +typedef struct { + void* tableOpt; + void* lru; // global or not +} rocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; typedef int (*EncodeFunc)(void* key, char* buf); @@ -354,6 +359,7 @@ int streamInitBackend(SStreamState* pState, char* path) { char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + rocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(rocksdbCfParam)); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); for (int i = 0; i < cfLen; i++) { cfOpt[i] = rocksdb_options_create_copy(opts); @@ -367,6 +373,8 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); + param[i].tableOpt = tableOpt; + param[i].lru = cache; // rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8); // rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans); }; @@ -391,6 +399,8 @@ int streamInitBackend(SStreamState* pState, char* path) { pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; pState->pTdbState->pCompare = pCompare; pState->pTdbState->dbOpt = opts; + pState->pTdbState->param = param; + pState->pTdbState->env = env; return 0; } void streamCleanBackend(SStreamState* pState) { @@ -398,12 +408,17 @@ void streamCleanBackend(SStreamState* pState) { qInfo("rocksdb already free"); return; } - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + rocksdbCfParam* param = pState->pTdbState->param; for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); + + rocksdb_cache_destroy(param[i].lru); + rocksdb_block_based_options_destroy(param[i].tableOpt); } + taosMemoryFree(pState->pTdbState->param); rocksdb_options_destroy(pState->pTdbState->dbOpt); taosMemoryFreeClear(pState->pTdbState->pHandle); @@ -417,6 +432,7 @@ void streamCleanBackend(SStreamState* pState) { pState->pTdbState->readOpts = NULL; rocksdb_close(pState->pTdbState->rocksdb); + rocksdb_env_destroy(pState->pTdbState->env); pState->pTdbState->rocksdb = NULL; } -- GitLab