diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 60be1ca74dd241d478346030039a29fef8fc814b..990dec09d24178a170b130ea2dbc047a426fc983 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -68,6 +68,8 @@ extern int32_t tsNumOfVnodeFetchThreads; extern int32_t tsNumOfVnodeWriteThreads; extern int32_t tsNumOfVnodeSyncThreads; extern int32_t tsNumOfVnodeMergeThreads; +extern int32_t tsNumOfQnodeQueryThreads; +extern int32_t tsNumOfQnodeFetchThreads; // monitor extern bool tsEnableMonitor; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 73086045bbf010e1027a793281e8b036bd867515..7f79be2ab2c21f6f6b017152c91b678e6267da2d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -62,6 +62,8 @@ int32_t tsNumOfVnodeFetchThreads = 2; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeMergeThreads = 2; +int32_t tsNumOfQnodeQueryThreads = 2; +int32_t tsNumOfQnodeFetchThreads = 2; // monitor bool tsEnableMonitor = true; @@ -409,6 +411,14 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeMergeThreads = TRANGE(tsNumOfVnodeMergeThreads, 1, 1); if (cfgAddInt32(pCfg, "numOfVnodeMergeThreads", tsNumOfVnodeMergeThreads, 1, 1024, 0) != 0) return -1; + tsNumOfQnodeQueryThreads = tsNumOfCores / 2; + tsNumOfQnodeQueryThreads = TMIN(tsNumOfQnodeQueryThreads, 1); + if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; + + tsNumOfQnodeFetchThreads = tsNumOfCores / 2; + tsNumOfQnodeFetchThreads = TRANGE(tsNumOfQnodeFetchThreads, 2, 4); + if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; + if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; @@ -540,6 +550,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32; + tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; + tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/qm/qmWorker.c b/source/dnode/mgmt/qm/qmWorker.c index c9090337abad9cac9fa98b2be240de09c946f773..db0752949dc4a5a689824a7a8312d404a4622474 100644 --- a/source/dnode/mgmt/qm/qmWorker.c +++ b/source/dnode/mgmt/qm/qmWorker.c @@ -106,13 +106,8 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { } int32_t qmStartWorker(SQnodeMgmt *pMgmt) { - int32_t maxFetchThreads = 4; - int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores); - int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * 1), 1); - int32_t maxQueryThreads = minQueryThreads; - - SSingleWorkerCfg queryCfg = {.min = minQueryThreads, - .max = maxQueryThreads, + SSingleWorkerCfg queryCfg = {.min = tsNumOfVnodeQueryThreads, + .max = tsNumOfVnodeQueryThreads, .name = "qnode-query", .fp = (FItem)qmProcessQueryQueue, .param = pMgmt}; @@ -122,8 +117,8 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { return -1; } - SSingleWorkerCfg fetchCfg = {.min = minFetchThreads, - .max = maxFetchThreads, + SSingleWorkerCfg fetchCfg = {.min = tsNumOfQnodeFetchThreads, + .max = tsNumOfQnodeFetchThreads, .name = "qnode-fetch", .fp = (FItem)qmProcessFetchQueue, .param = pMgmt};