diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 157e37f080bd730c0a08f1bc622a6c0089eb50cd..ced94e0f11a4410e971c4ca8f1ccf71a05041c46 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -29,7 +29,6 @@ extern "C" { #define SLOW_LOG_TYPE_OTHERS 0x4 #define SLOW_LOG_TYPE_ALL 0xFFFFFFFF - // cluster extern char tsFirst[]; extern char tsSecond[]; @@ -181,6 +180,7 @@ extern bool tsDisableStream; extern int64_t tsStreamBufferSize; extern int64_t tsCheckpointInterval; extern bool tsFilterScalarMode; +extern int32_t tsMaxStreamBackendCache; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c648f8551ae8bd8948b618a3c3671aa50d469299..c5646d92d1ee23000e5848129a81249c16737dd8 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,6 +60,7 @@ int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; +int32_t tsMaxStreamBackendCache = 128; // M // sync raft int32_t tsElectInterval = 25 * 1000; @@ -105,7 +106,7 @@ int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT bool tsEnableQueryHb = false; -bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true +bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. bool tsQueryPlannerTrace = false; @@ -117,8 +118,8 @@ int32_t tsRedirectFactor = 2; int32_t tsRedirectMaxPeriod = 1000; int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; -int32_t tsMetaCacheMaxSize = -1; // MB -int32_t tsSlowLogThreshold = 3; // seconds +int32_t tsMetaCacheMaxSize = -1; // MB +int32_t tsSlowLogThreshold = 3; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; /* @@ -349,7 +350,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1; if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; - if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1; + if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) + return -1; if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1; if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, true) != 0) return -1; if (cfgAddString(pCfg, "slowLogScope", "", true) != 0) return -1; @@ -524,6 +526,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1; if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1; GRANT_CFG_ADD; return 0; @@ -781,7 +784,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32; tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval; - tsEnableScience = cfgGetItem(pCfg, "enableScience")->bval; + tsEnableScience = cfgGetItem(pCfg, "enableScience")->bval; tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32; @@ -902,7 +905,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64; tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; - + tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; + GRANT_CFG_GET; return 0; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f7638a42ae7d79addd638f68b8601bdbc9161579..c8a6597badcbe1fa5124bbe8be9dd160516198f2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -38,6 +38,15 @@ typedef struct { rocksdb_comparator_t** pCompares; } RocksdbCfInst; +uint32_t nextPow2(uint32_t x) { + x = x - 1; + x = x | (x >> 1); + x = x | (x >> 2); + x = x | (x >> 4); + x = x | (x >> 8); + x = x | (x >> 16); + return x + 1; +} int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); @@ -92,6 +101,8 @@ void* streamBackendInit(const char* path) { rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 2); rocksdb_options_set_info_log_level(opts, 0); + uint32_t dbLimit = nextPow2(tsMaxStreamBackendCache); + rocksdb_options_set_db_write_buffer_size(opts, dbLimit << 20); pHandle->env = env; pHandle->dbOpt = opts;