diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 94f294d2bfd09a49ccd7c673ce30ed5310b1679b..0633bb82013ae365b39c11c0626e6a9c2a74d92f 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -41,8 +41,6 @@ extern int32_t tsMaxConnections; extern int32_t tsMaxShellConns; extern int32_t tsShellActivityTimer; extern int32_t tsMaxTmrCtrl; -extern float tsNumOfThreadsPerCore; -extern int32_t tsNumOfCommitThreads; extern float tsRatioOfQueryCores; extern int32_t tsCompressMsgSize; extern int32_t tsCompressColData; @@ -60,6 +58,11 @@ extern int32_t tsQnodeShmSize; extern int32_t tsSnodeShmSize; extern int32_t tsBnodeShmSize; +// queue & threads +extern int32_t tsNumOfRpcThreads; +extern int32_t tsNumOfCommitThreads; +extern int32_t tsNumOfTaskQueueThreads; + // monitor extern bool tsEnableMonitor; extern int32_t tsMonitorInterval; diff --git a/include/os/osMath.h b/include/os/osMath.h index 3fe46d557ed348a934a32097f1aca74ba4b2e550..0b5e4bd93ceeb5f97064b4d011675a5e15fdbd70 100644 --- a/include/os/osMath.h +++ b/include/os/osMath.h @@ -56,6 +56,12 @@ extern "C" { __typeof(b) __b = (b); \ (__a < __b) ? __a : __b; \ }) + +#define TRANGE(a, b, c) \ + ({ \ + a = TMAX(a, b); \ + a = TMIN(a, c); \ + }) #endif #ifdef __cplusplus diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 97306f2da06051f1bfe598e2df27d9021c4df988..af30acf141dea0eb8266d94643acc6354f0075f0 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -30,21 +30,19 @@ char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port uint16_t tsServerPort = 6030; int32_t tsVersion = 30000000; int32_t tsStatusInterval = 1; // second -bool tsEnableTelemetryReporting = 0; +bool tsEnableTelemetryReporting = false; // common int32_t tsRpcTimer = 300; -int32_t tsRpcMaxTime = 600; // seconds; -bool tsRpcForceTcp = 1; // disable this, means query, show command use udp protocol as default +int32_t tsRpcMaxTime = 600; // seconds; +bool tsRpcForceTcp = true; // disable this, means query, show command use udp protocol as default int32_t tsMaxShellConns = 50000; int32_t tsMaxConnections = 50000; int32_t tsShellActivityTimer = 3; // second -float tsNumOfThreadsPerCore = 1.0f; -int32_t tsNumOfCommitThreads = 4; float tsRatioOfQueryCores = 1.0f; int32_t tsMaxBinaryDisplayWidth = 30; -bool tsEnableSlaveQuery = 1; -bool tsPrintAuth = 0; +bool tsEnableSlaveQuery = true; +bool tsPrintAuth = false; // multi process bool tsMultiProcess = false; @@ -54,8 +52,13 @@ int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4; int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4; int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4; +// queue & threads +int32_t tsNumOfRpcThreads = 1; +int32_t tsNumOfCommitThreads = 2; +int32_t tsNumOfTaskQueueThreads = 1; + // monitor -bool tsEnableMonitor = 1; +bool tsEnableMonitor = true; int32_t tsMonitorInterval = 30; char tsMonitorFqdn[TSDB_FQDN_LEN] = {0}; uint16_t tsMonitorPort = 6043; @@ -124,13 +127,13 @@ int32_t tsQueryBufferSize = -1; int64_t tsQueryBufferSizeBytes = -1; // in retrieve blocking model, the retrieve threads will wait for the completion of the query processing. -bool tsRetrieveBlockingModel = 0; +bool tsRetrieveBlockingModel = false; // last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name -bool tsKeepOriginalColumnName = 0; +bool tsKeepOriginalColumnName = false; // kill long query -bool tsDeadLockKillQuery = 0; +bool tsDeadLockKillQuery = false; // tsdb config // For backward compatibility @@ -290,7 +293,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1; if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1; if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1; - if (cfgAddFloat(pCfg, "numOfThreadsPerCore", tsNumOfThreadsPerCore, 0, 10, 1) != 0) return -1; if (cfgAddInt32(pCfg, "maxTmrCtrl", tsMaxTmrCtrl, 8, 2048, 1) != 0) return -1; if (cfgAddInt32(pCfg, "rpcTimer", tsRpcTimer, 100, 3000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "rpcMaxTime", tsRpcMaxTime, 100, 7200, 1) != 0) return -1; @@ -304,6 +306,11 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddInt32(pCfg, "maxBinaryDisplayWidth", tsMaxBinaryDisplayWidth, 1, 65536, 1) != 0) return -1; + + tsNumOfTaskQueueThreads = tsNumOfCores / 4; + tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2); + if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 1, 1024, 0) != 0) return -1; + return 0; } @@ -336,7 +343,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "supportVnodes", 256, 0, 65536, 0) != 0) return -1; if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1; if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 100, 0) != 0) return -1; if (cfgAddFloat(pCfg, "ratioOfQueryCores", tsRatioOfQueryCores, 0, 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelemetryReporting, 0) != 0) return -1; @@ -360,7 +366,15 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; - // if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; + + tsNumOfRpcThreads = tsNumOfCores / 2; + tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4); + if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfCommitThreads = tsNumOfCommitThreads / 2; + tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); + if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 100, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; @@ -424,7 +438,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { return -1; } - tsNumOfThreadsPerCore = cfgGetItem(pCfg, "numOfThreadsPerCore")->fval; tsMaxTmrCtrl = cfgGetItem(pCfg, "maxTmrCtrl")->i32; tsRpcTimer = cfgGetItem(pCfg, "rpcTimer")->i32; tsRpcMaxTime = cfgGetItem(pCfg, "rpcMaxTime")->i32; @@ -437,7 +450,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsMaxNumOfOrderedResults = cfgGetItem(pCfg, "maxNumOfOrderedRes")->i32; tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsMaxBinaryDisplayWidth = cfgGetItem(pCfg, "maxBinaryDisplayWidth")->i32; - + tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32; return 0; } @@ -461,7 +474,6 @@ static void taosSetSystemCfg(SConfig *pCfg) { static int32_t taosSetServerCfg(SConfig *pCfg) { tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; - tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsRatioOfQueryCores = cfgGetItem(pCfg, "ratioOfQueryCores")->fval; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsEnableTelemetryReporting = cfgGetItem(pCfg, "telemetryReporting")->bval; @@ -485,7 +497,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32; tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32; tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32; - // tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32; + tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32; + + tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; + tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/main/dndTransport.c b/source/dnode/mgmt/main/dndTransport.c index f3065bdcad6d422b09407c93cc381c0309fea65c..e76633bb1f1050cfeb1e7777173ccc5f9d271d1a 100644 --- a/source/dnode/mgmt/main/dndTransport.c +++ b/source/dnode/mgmt/main/dndTransport.c @@ -257,16 +257,11 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch static int32_t dndInitServer(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; - int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); - if (numOfThreads < 1) { - numOfThreads = 1; - } - SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = pDnode->serverPort; rpcInit.label = "DND"; - rpcInit.numOfThreads = numOfThreads; + rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = (RpcCfp)dndProcessMsg; rpcInit.sessions = tsMaxShellConns; rpcInit.connType = TAOS_CONN_SERVER; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 4ac19294aa4e0752eb0831c023a4905d8cde5b21..288d2e5f766c2696e473ef3a4067c74f0d3b9eae 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -97,18 +97,14 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag static void* pTaskQueue = NULL; int32_t initTaskQueue() { - double factor = 4.0; - - int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2); - int32_t queueSize = tsMaxConnections * 2; - pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc"); + pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc"); if (NULL == pTaskQueue) { qError("failed to init task queue"); return -1; } - qDebug("task queue is initialized, numOfThreads: %d", numOfThreads); + qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads); return 0; }