提交 e4652d5a 编写于 作者: S Shengliang Guan

adjust thread nums

上级 5eb0b463
...@@ -41,8 +41,6 @@ extern int32_t tsMaxConnections; ...@@ -41,8 +41,6 @@ extern int32_t tsMaxConnections;
extern int32_t tsMaxShellConns; extern int32_t tsMaxShellConns;
extern int32_t tsShellActivityTimer; extern int32_t tsShellActivityTimer;
extern int32_t tsMaxTmrCtrl; extern int32_t tsMaxTmrCtrl;
extern float tsNumOfThreadsPerCore;
extern int32_t tsNumOfCommitThreads;
extern float tsRatioOfQueryCores; extern float tsRatioOfQueryCores;
extern int32_t tsCompressMsgSize; extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData; extern int32_t tsCompressColData;
...@@ -60,6 +58,11 @@ extern int32_t tsQnodeShmSize; ...@@ -60,6 +58,11 @@ extern int32_t tsQnodeShmSize;
extern int32_t tsSnodeShmSize; extern int32_t tsSnodeShmSize;
extern int32_t tsBnodeShmSize; extern int32_t tsBnodeShmSize;
// queue & threads
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfCommitThreads;
extern int32_t tsNumOfTaskQueueThreads;
// monitor // monitor
extern bool tsEnableMonitor; extern bool tsEnableMonitor;
extern int32_t tsMonitorInterval; extern int32_t tsMonitorInterval;
......
...@@ -56,6 +56,12 @@ extern "C" { ...@@ -56,6 +56,12 @@ extern "C" {
__typeof(b) __b = (b); \ __typeof(b) __b = (b); \
(__a < __b) ? __a : __b; \ (__a < __b) ? __a : __b; \
}) })
#define TRANGE(a, b, c) \
({ \
a = TMAX(a, b); \
a = TMIN(a, c); \
})
#endif #endif
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -30,21 +30,19 @@ char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port ...@@ -30,21 +30,19 @@ char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030; uint16_t tsServerPort = 6030;
int32_t tsVersion = 30000000; int32_t tsVersion = 30000000;
int32_t tsStatusInterval = 1; // second int32_t tsStatusInterval = 1; // second
bool tsEnableTelemetryReporting = 0; bool tsEnableTelemetryReporting = false;
// common // common
int32_t tsRpcTimer = 300; int32_t tsRpcTimer = 300;
int32_t tsRpcMaxTime = 600; // seconds; int32_t tsRpcMaxTime = 600; // seconds;
bool tsRpcForceTcp = 1; // disable this, means query, show command use udp protocol as default bool tsRpcForceTcp = true; // disable this, means query, show command use udp protocol as default
int32_t tsMaxShellConns = 50000; int32_t tsMaxShellConns = 50000;
int32_t tsMaxConnections = 50000; int32_t tsMaxConnections = 50000;
int32_t tsShellActivityTimer = 3; // second int32_t tsShellActivityTimer = 3; // second
float tsNumOfThreadsPerCore = 1.0f;
int32_t tsNumOfCommitThreads = 4;
float tsRatioOfQueryCores = 1.0f; float tsRatioOfQueryCores = 1.0f;
int32_t tsMaxBinaryDisplayWidth = 30; int32_t tsMaxBinaryDisplayWidth = 30;
bool tsEnableSlaveQuery = 1; bool tsEnableSlaveQuery = true;
bool tsPrintAuth = 0; bool tsPrintAuth = false;
// multi process // multi process
bool tsMultiProcess = false; bool tsMultiProcess = false;
...@@ -54,8 +52,13 @@ int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4; ...@@ -54,8 +52,13 @@ int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4; int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4; int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
// queue & threads
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 1;
// monitor // monitor
bool tsEnableMonitor = 1; bool tsEnableMonitor = true;
int32_t tsMonitorInterval = 30; int32_t tsMonitorInterval = 30;
char tsMonitorFqdn[TSDB_FQDN_LEN] = {0}; char tsMonitorFqdn[TSDB_FQDN_LEN] = {0};
uint16_t tsMonitorPort = 6043; uint16_t tsMonitorPort = 6043;
...@@ -124,13 +127,13 @@ int32_t tsQueryBufferSize = -1; ...@@ -124,13 +127,13 @@ int32_t tsQueryBufferSize = -1;
int64_t tsQueryBufferSizeBytes = -1; int64_t tsQueryBufferSizeBytes = -1;
// in retrieve blocking model, the retrieve threads will wait for the completion of the query processing. // in retrieve blocking model, the retrieve threads will wait for the completion of the query processing.
bool tsRetrieveBlockingModel = 0; bool tsRetrieveBlockingModel = false;
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name // last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
bool tsKeepOriginalColumnName = 0; bool tsKeepOriginalColumnName = false;
// kill long query // kill long query
bool tsDeadLockKillQuery = 0; bool tsDeadLockKillQuery = false;
// tsdb config // tsdb config
// For backward compatibility // For backward compatibility
...@@ -290,7 +293,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -290,7 +293,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1; if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1; if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1; if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "numOfThreadsPerCore", tsNumOfThreadsPerCore, 0, 10, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "maxTmrCtrl", tsMaxTmrCtrl, 8, 2048, 1) != 0) return -1; if (cfgAddInt32(pCfg, "maxTmrCtrl", tsMaxTmrCtrl, 8, 2048, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcTimer", tsRpcTimer, 100, 3000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "rpcTimer", tsRpcTimer, 100, 3000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcMaxTime", tsRpcMaxTime, 100, 7200, 1) != 0) return -1; if (cfgAddInt32(pCfg, "rpcMaxTime", tsRpcMaxTime, 100, 7200, 1) != 0) return -1;
...@@ -304,6 +306,11 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -304,6 +306,11 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
return -1; return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "maxBinaryDisplayWidth", tsMaxBinaryDisplayWidth, 1, 65536, 1) != 0) return -1; if (cfgAddInt32(pCfg, "maxBinaryDisplayWidth", tsMaxBinaryDisplayWidth, 1, 65536, 1) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 4;
tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2);
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 1, 1024, 0) != 0) return -1;
return 0; return 0;
} }
...@@ -336,7 +343,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -336,7 +343,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "supportVnodes", 256, 0, 65536, 0) != 0) return -1; if (cfgAddInt32(pCfg, "supportVnodes", 256, 0, 65536, 0) != 0) return -1;
if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1; if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1; if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 100, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "ratioOfQueryCores", tsRatioOfQueryCores, 0, 2, 0) != 0) return -1; if (cfgAddFloat(pCfg, "ratioOfQueryCores", tsRatioOfQueryCores, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelemetryReporting, 0) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelemetryReporting, 0) != 0) return -1;
...@@ -360,7 +366,15 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -360,7 +366,15 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
// if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
tsNumOfCommitThreads = tsNumOfCommitThreads / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 100, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
...@@ -424,7 +438,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -424,7 +438,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
return -1; return -1;
} }
tsNumOfThreadsPerCore = cfgGetItem(pCfg, "numOfThreadsPerCore")->fval;
tsMaxTmrCtrl = cfgGetItem(pCfg, "maxTmrCtrl")->i32; tsMaxTmrCtrl = cfgGetItem(pCfg, "maxTmrCtrl")->i32;
tsRpcTimer = cfgGetItem(pCfg, "rpcTimer")->i32; tsRpcTimer = cfgGetItem(pCfg, "rpcTimer")->i32;
tsRpcMaxTime = cfgGetItem(pCfg, "rpcMaxTime")->i32; tsRpcMaxTime = cfgGetItem(pCfg, "rpcMaxTime")->i32;
...@@ -437,7 +450,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -437,7 +450,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsMaxNumOfOrderedResults = cfgGetItem(pCfg, "maxNumOfOrderedRes")->i32; tsMaxNumOfOrderedResults = cfgGetItem(pCfg, "maxNumOfOrderedRes")->i32;
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsMaxBinaryDisplayWidth = cfgGetItem(pCfg, "maxBinaryDisplayWidth")->i32; tsMaxBinaryDisplayWidth = cfgGetItem(pCfg, "maxBinaryDisplayWidth")->i32;
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
return 0; return 0;
} }
...@@ -461,7 +474,6 @@ static void taosSetSystemCfg(SConfig *pCfg) { ...@@ -461,7 +474,6 @@ static void taosSetSystemCfg(SConfig *pCfg) {
static int32_t taosSetServerCfg(SConfig *pCfg) { static int32_t taosSetServerCfg(SConfig *pCfg) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsRatioOfQueryCores = cfgGetItem(pCfg, "ratioOfQueryCores")->fval; tsRatioOfQueryCores = cfgGetItem(pCfg, "ratioOfQueryCores")->fval;
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
tsEnableTelemetryReporting = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableTelemetryReporting = cfgGetItem(pCfg, "telemetryReporting")->bval;
...@@ -485,7 +497,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -485,7 +497,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32; tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32;
tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32; tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32;
tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32; tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32;
// tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32; tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32;
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
......
...@@ -257,16 +257,11 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch ...@@ -257,16 +257,11 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch
static int32_t dndInitServer(SDnode *pDnode) { static int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
}
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pDnode->serverPort; rpcInit.localPort = pDnode->serverPort;
rpcInit.label = "DND"; rpcInit.label = "DND";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.cfp = (RpcCfp)dndProcessMsg; rpcInit.cfp = (RpcCfp)dndProcessMsg;
rpcInit.sessions = tsMaxShellConns; rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
......
...@@ -97,18 +97,14 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag ...@@ -97,18 +97,14 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
static void* pTaskQueue = NULL; static void* pTaskQueue = NULL;
int32_t initTaskQueue() { int32_t initTaskQueue() {
double factor = 4.0;
int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
int32_t queueSize = tsMaxConnections * 2; int32_t queueSize = tsMaxConnections * 2;
pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc"); pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc");
if (NULL == pTaskQueue) { if (NULL == pTaskQueue) {
qError("failed to init task queue"); qError("failed to init task queue");
return -1; return -1;
} }
qDebug("task queue is initialized, numOfThreads: %d", numOfThreads); qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册