diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 4013957026168e24844be51aa303d71edc940286..dfa2cfe8accc5b9fbf40de2d285c2d4883672161 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -236,7 +236,7 @@ if(${BUILD_WITH_ROCKSDB}) endif(${TD_WINDOWS}) - if(${TD_DARWIN} OR ${TD_WINDOWS}) + if(${TD_DARWIN}) option(HAVE_THREAD_LOCAL "" OFF) option(WITH_IOSTATS_CONTEXT "" OFF) option(WITH_PERF_CONTEXT "" OFF) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index e95d76aa23e58f81dbe0d748178126b2277431e3..76742ae39c9855e2fe85fd44496bc84f5744cde3 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -362,7 +362,10 @@ const char* compareFuncKeyName(void* name); const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); -void destroyFunc(void* stata) { return; } +void destroyFunc(void* arg) { + (void)arg; + return; +} typedef struct { const char* key; @@ -434,13 +437,34 @@ SCfInit ginitDict[] = { encodeValueFunc, decodeValueFunc}, }; -const char* compareDefaultName(void* name) { return ginitDict[0].key; } -const char* compareStateName(void* name) { return ginitDict[1].key; } -const char* compareWinKeyName(void* name) { return ginitDict[2].key; } -const char* compareSessionKeyName(void* name) { return ginitDict[3].key; } -const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } -const char* compareParKeyName(void* name) { return ginitDict[5].key; } -const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } +const char* compareDefaultName(void* arg) { + (void)arg; + return ginitDict[0].key; +} +const char* compareStateName(void* arg) { + (void)arg; + return ginitDict[1].key; +} +const char* compareWinKeyName(void* arg) { + (void)arg; + return ginitDict[2].key; +} +const char* compareSessionKeyName(void* arg) { + (void)arg; + return ginitDict[3].key; +} +const char* compareFuncKeyName(void* arg) { + (void)arg; + return ginitDict[4].key; +} +const char* compareParKeyName(void* arg) { + (void)arg; + return ginitDict[5].key; +} +const char* comparePartagKeyName(void* arg) { + (void)arg; + return ginitDict[6].key; +} typedef struct SCompactFilteFactory { void* status; @@ -454,9 +478,7 @@ const char* compactFilteFactoryName(void* arg) { return "stream_compact_filter"; } -void destroyCompactFilte(void* arg) { - if (arg == NULL) return; -} +void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { int64_t unixTime = taosGetTimestampMs(); @@ -485,8 +507,8 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_t* opts = rocksdb_options_create(); rocksdb_options_set_env(opts, env); // rocksdb_options_increase_parallelism(opts, 8); - // rocksdb_options_optimize_level_style_compaction(opts, 0); - // create the DB if it's not already present + // rocksdb_options_optimize_level_style_compaction(opts, 0); + // create the DB if it's not already present rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); rocksdb_options_set_write_buffer_size(opts, 128 << 20); @@ -494,6 +516,7 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_compactionfilterfactory_t* factory = rocksdb_compactionfilterfactory_create( NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); rocksdb_options_set_compaction_filter_factory(opts, factory); + rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); char* err = NULL; @@ -570,15 +593,12 @@ void streamCleanBackend(SStreamState* pState) { rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); rocksdb_block_based_options_destroy(param[i].tableOpt); - // rocksdb_compactionfilterfactory_destroy(param[i].filteFactory); } rocksdb_cache_destroy(pState->pTdbState->cache); taosMemoryFreeClear(pState->pTdbState->cfOpts); taosMemoryFree(pState->pTdbState->pCompare); taosMemoryFree(pState->pTdbState->param); rocksdb_env_destroy(pState->pTdbState->env); - rocksdb_compactionfilterfactory_destroy(pState->pTdbState->compactFactory); - pState->pTdbState->rocksdb = NULL; } @@ -612,12 +632,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); } - rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); *readOpt = rOpt; - // rocksdb_readoptions_set_snapshot(rOpt, *snapshot); - + rocksdb_readoptions_set_snapshot(rOpt, *snapshot); rocksdb_readoptions_set_fill_cache(rOpt, 0); return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); @@ -690,6 +708,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa } else { \ taosMemoryFree(p); \ } \ + taosMemoryFree(val); \ if (vLen != NULL) *vLen = len; \ } \ if (err != NULL) { \