未验证 提交 28ea375f 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20850 from taosdata/enh/ts-3112

enh: add queryMaxConcurrentTables configuration
...@@ -104,6 +104,7 @@ extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row lo ...@@ -104,6 +104,7 @@ extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row lo
// query client // query client
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
extern int32_t tsQueryRspPolicy; extern int32_t tsQueryRspPolicy;
extern int64_t tsQueryMaxConcurrentTables;
extern int32_t tsQuerySmaOptimize; extern int32_t tsQuerySmaOptimize;
extern int32_t tsQueryRsmaTolerance; extern int32_t tsQueryRsmaTolerance;
extern bool tsQueryPlannerTrace; extern bool tsQueryPlannerTrace;
......
...@@ -103,6 +103,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table ...@@ -103,6 +103,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0; int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = false; bool tsEnableQueryHb = false;
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
...@@ -340,6 +341,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -340,6 +341,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
...@@ -735,6 +737,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -735,6 +737,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval; tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64;
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
......
...@@ -54,7 +54,6 @@ typedef enum { ...@@ -54,7 +54,6 @@ typedef enum {
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 #define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ #define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20 #define SCHEDULE_DEFAULT_MAX_NODE_NUM 20
...@@ -134,7 +133,7 @@ typedef struct SSchStatusFps { ...@@ -134,7 +133,7 @@ typedef struct SSchStatusFps {
typedef struct SSchedulerCfg { typedef struct SSchedulerCfg {
uint32_t maxJobNum; uint32_t maxJobNum;
int32_t maxNodeTableNum; int64_t maxNodeTableNum;
SCH_POLICY schPolicy; SCH_POLICY schPolicy;
bool enableReSchedule; bool enableReSchedule;
} SSchedulerCfg; } SSchedulerCfg;
...@@ -175,7 +174,7 @@ typedef struct SSchHbCallbackParam { ...@@ -175,7 +174,7 @@ typedef struct SSchHbCallbackParam {
typedef struct SSchFlowControl { typedef struct SSchFlowControl {
SRWLatch lock; SRWLatch lock;
bool sorted; bool sorted;
int32_t tableNumSum; int64_t tableNumSum;
uint32_t execTaskNum; uint32_t execTaskNum;
SArray *taskList; // Element is SSchTask* SArray *taskList; // Element is SSchTask*
} SSchFlowControl; } SSchFlowControl;
......
...@@ -46,7 +46,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { ...@@ -46,7 +46,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t sum = 0; int64_t sum = 0;
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks); int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i); SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
...@@ -55,7 +55,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { ...@@ -55,7 +55,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
} }
if (schMgmt.cfg.maxNodeTableNum <= 0 || sum < schMgmt.cfg.maxNodeTableNum) { if (schMgmt.cfg.maxNodeTableNum <= 0 || sum < schMgmt.cfg.maxNodeTableNum) {
SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum); SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%" PRId64, sum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -68,7 +68,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { ...@@ -68,7 +68,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
SCH_SET_JOB_NEED_FLOW_CTRL(pJob); SCH_SET_JOB_NEED_FLOW_CTRL(pJob);
SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum); SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%" PRId64, sum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -94,7 +94,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { ...@@ -94,7 +94,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
--ctrl->execTaskNum; --ctrl->execTaskNum;
ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum; ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;
SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn,
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
_return: _return:
...@@ -125,7 +125,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { ...@@ -125,7 +125,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn,
ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum); ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);
*enough = true; *enough = true;
...@@ -142,7 +142,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { ...@@ -142,7 +142,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
break; break;
} }
int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; int64_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
if (sum <= schMgmt.cfg.maxNodeTableNum) { if (sum <= schMgmt.cfg.maxNodeTableNum) {
ctrl->tableNumSum = sum; ctrl->tableNumSum = sum;
...@@ -173,7 +173,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { ...@@ -173,7 +173,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
_return: _return:
SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d",
((*enough) ? "" : "NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ((*enough) ? "" : "NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum,
ctrl->execTaskNum); ctrl->execTaskNum);
...@@ -203,7 +203,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { ...@@ -203,7 +203,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum; int64_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
int32_t taskNum = taosArrayGetSize(ctrl->taskList); int32_t taskNum = taosArrayGetSize(ctrl->taskList);
int32_t code = 0; int32_t code = 0;
SSchTask *pTask = NULL; SSchTask *pTask = NULL;
...@@ -217,7 +217,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { ...@@ -217,7 +217,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn,
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
continue; continue;
...@@ -228,14 +228,14 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { ...@@ -228,14 +228,14 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
taosArrayRemove(ctrl->taskList, i); taosArrayRemove(ctrl->taskList, i);
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn,
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask)); SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
remainNum -= pTask->plan->execNodeStat.tableNum; remainNum -= pTask->plan->execNodeStat.tableNum;
if (remainNum <= 0) { if (remainNum <= 0) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, ep->port, SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d", ep->fqdn, ep->port,
ctrl->tableNumSum, ctrl->execTaskNum); ctrl->tableNumSum, ctrl->execTaskNum);
break; break;
...@@ -244,7 +244,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { ...@@ -244,7 +244,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
if (i < (taskNum - 1)) { if (i < (taskNum - 1)) {
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
if (remainNum < pLastTask->plan->execNodeStat.tableNum) { if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d", SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d, smallestInList:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);
break; break;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "schInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "tglobal.h"
SSchedulerMgmt schMgmt = { SSchedulerMgmt schMgmt = {
.jobRef = -1, .jobRef = -1,
...@@ -30,11 +31,12 @@ int32_t schedulerInit() { ...@@ -30,11 +31,12 @@ int32_t schedulerInit() {
} }
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; schMgmt.cfg.maxNodeTableNum = tsQueryMaxConcurrentTables;
schMgmt.cfg.schPolicy = SCHEDULE_DEFAULT_POLICY; schMgmt.cfg.schPolicy = SCHEDULE_DEFAULT_POLICY;
schMgmt.cfg.enableReSchedule = true; schMgmt.cfg.enableReSchedule = true;
qDebug("schedule policy init to %d", schMgmt.cfg.schPolicy); qDebug("schedule init, policy: %d, maxNodeTableNum: %" PRId64", reSchedule:%d",
schMgmt.cfg.schPolicy, schMgmt.cfg.maxNodeTableNum, schMgmt.cfg.enableReSchedule);
schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
if (schMgmt.jobRef < 0) { if (schMgmt.jobRef < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册