diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 3581a4c0ff9ab31836ba72ea8c73fba2e9ea2e0f..cebe4e8204777926bf6aabf3352436595b84011d 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -81,6 +81,8 @@ const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); void* streamBackendInit(const char* path) { + uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; + qDebug("start to init stream backend at %s", path); SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); pHandle->list = tdListNew(sizeof(SCfComparator)); @@ -89,22 +91,23 @@ void* streamBackendInit(const char* path) { pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); - rocksdb_env_set_low_priority_background_threads(env, tsNumOfSnodeStreamThreads); - rocksdb_env_set_high_priority_background_threads(env, tsNumOfSnodeStreamThreads); - rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20); + int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2; + rocksdb_env_set_low_priority_background_threads(env, nBGThread); + rocksdb_env_set_high_priority_background_threads(env, nBGThread); + + rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2); rocksdb_options_t* opts = rocksdb_options_create(); rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_max_total_wal_size(opts, 128 << 20); + rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 3); rocksdb_options_set_info_log_level(opts, 0); - uint32_t dbLimit = nextPow2(tsMaxStreamBackendCache); - rocksdb_options_set_db_write_buffer_size(opts, dbLimit << 20); - rocksdb_options_set_write_buffer_size(opts, (dbLimit << 20) / 2); + rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit); + rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2); pHandle->env = env; pHandle->dbOpt = opts;