Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a0f7cc67
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a0f7cc67
编写于
7月 14, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: add schedule policy
上级
a8569fc4
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
189 addition
and
68 deletion
+189
-68
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+6
-2
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+1
-2
source/client/src/clientMain.c
source/client/src/clientMain.c
+1
-0
source/libs/command/inc/commandInt.h
source/libs/command/inc/commandInt.h
+4
-0
source/libs/command/src/command.c
source/libs/command/src/command.c
+37
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+6
-0
source/libs/qworker/inc/qwMsg.h
source/libs/qworker/inc/qwMsg.h
+1
-1
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+1
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+1
-1
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+47
-38
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+54
-8
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+27
-14
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+1
-1
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+2
-0
未找到文件。
include/libs/scheduler/scheduler.h
浏览文件 @
a0f7cc67
...
...
@@ -26,8 +26,10 @@ extern "C" {
extern
tsem_t
schdRspSem
;
typedef
struct
SSchedulerCfg
{
uint32_t
maxJobNum
;
int32_t
maxNodeTableNum
;
uint32_t
maxJobNum
;
int32_t
maxNodeTableNum
;
SCH_POLICY
schPolicy
;
bool
enableReSchedule
;
}
SSchedulerCfg
;
typedef
struct
SQueryProfileSummary
{
...
...
@@ -96,6 +98,8 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
void
schedulerStopQueryHb
(
void
*
pTrans
);
int32_t
schedulerUpdatePolicy
(
int32_t
policy
);
int32_t
schedulerEnableReSchedule
(
bool
enableResche
);
/**
* Cancel query job
...
...
source/client/src/clientEnv.c
浏览文件 @
a0f7cc67
...
...
@@ -359,8 +359,7 @@ void taos_init_imp(void) {
SCatalogCfg
cfg
=
{.
maxDBCacheNum
=
100
,
.
maxTblCacheNum
=
100
};
catalogInit
(
&
cfg
);
SSchedulerCfg
scfg
=
{.
maxJobNum
=
100
};
schedulerInit
(
&
scfg
);
schedulerInit
();
tscDebug
(
"starting to initialize TAOS driver"
);
taosSetCoreDump
(
true
);
...
...
source/client/src/clientMain.c
浏览文件 @
a0f7cc67
...
...
@@ -129,6 +129,7 @@ void taos_close(TAOS *taos) {
STscObj
*
pObj
=
acquireTscObj
(
*
(
int64_t
*
)
taos
);
if
(
NULL
==
pObj
)
{
taosMemoryFree
(
taos
);
return
;
}
...
...
source/libs/command/inc/commandInt.h
浏览文件 @
a0f7cc67
...
...
@@ -77,6 +77,10 @@ extern "C" {
#define EXPLAIN_MODE_FORMAT "mode=%s"
#define EXPLAIN_STRING_TYPE_FORMAT "%s"
#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
#define COMMAND_ENABLE_RESCHEDULE "enableReSchedule"
typedef
struct
SExplainGroup
{
int32_t
nodeNum
;
int32_t
physiPlanExecNum
;
...
...
source/libs/command/src/command.c
浏览文件 @
a0f7cc67
...
...
@@ -479,7 +479,42 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR
return
execShowCreateTable
(
pStmt
,
pRsp
);
}
static
int32_t
execAlterCmd
(
char
*
cmd
,
char
*
value
,
bool
*
processed
)
{
int32_t
code
=
0
;
if
(
0
==
strcasecmp
(
cmd
,
COMMAND_RESET_LOG
))
{
taosResetLog
();
cfgDumpCfg
(
tsCfg
,
0
,
false
);
}
else
if
(
0
==
strcasecmp
(
cmd
,
COMMAND_SCHEDULE_POLICY
))
{
code
=
schedulerUpdatePolicy
(
atoi
(
value
));
}
else
if
(
0
==
strcasecmp
(
cmd
,
COMMAND_ENABLE_RESCHEDULE
))
{
code
=
schedulerEnableReSchedule
(
atoi
(
value
));
}
else
{
goto
_return
;
}
*
processed
=
true
;
_return:
if
(
code
)
{
terrno
=
code
;
}
return
code
;
}
static
int32_t
execAlterLocal
(
SAlterLocalStmt
*
pStmt
)
{
bool
processed
=
false
;
if
(
execAlterCmd
(
pStmt
->
config
,
pStmt
->
value
,
&
processed
))
{
return
terrno
;
}
if
(
processed
)
{
goto
_return
;
}
if
(
cfgSetItem
(
tsCfg
,
pStmt
->
config
,
pStmt
->
value
,
CFG_STYPE_ALTER_CMD
))
{
return
terrno
;
}
...
...
@@ -488,6 +523,8 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
return
terrno
;
}
_return:
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
a0f7cc67
...
...
@@ -388,6 +388,11 @@ static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode((SNode*
static
void
destroyExprNode
(
SExprNode
*
pExpr
)
{
taosArrayDestroy
(
pExpr
->
pAssociation
);
}
static
void
nodesDestroyNodePointer
(
void
*
node
)
{
SNode
*
pNode
=
*
(
SNode
**
)
node
;
nodesDestroyNode
(
pNode
);
}
void
nodesDestroyNode
(
SNode
*
pNode
)
{
if
(
NULL
==
pNode
)
{
return
;
...
...
@@ -718,6 +723,7 @@ void nodesDestroyNode(SNode* pNode) {
}
taosArrayDestroy
(
pQuery
->
pDbList
);
taosArrayDestroy
(
pQuery
->
pTableList
);
taosArrayDestroyEx
(
pQuery
->
pPlaceholderValues
,
nodesDestroyNodePointer
);
break
;
}
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
{
...
...
source/libs/qworker/inc/qwMsg.h
浏览文件 @
a0f7cc67
...
...
@@ -24,7 +24,7 @@ extern "C" {
#include "dataSinkMgt.h"
int32_t
qwAbortPrerocessQuery
(
QW_FPARAMS_DEF
);
int32_t
qwPrerocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwPre
p
rocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
const
char
*
sql
);
int32_t
qwProcessCQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessReady
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
...
...
source/libs/qworker/src/qwMsg.c
浏览文件 @
a0f7cc67
...
...
@@ -318,7 +318,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg
qwMsg
=
{.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
};
QW_SCH_TASK_DLOG
(
"prerocessQuery start, handle:%p"
,
pMsg
->
info
.
handle
);
QW_ERR_RET
(
qwPrerocessQuery
(
QW_FPARAMS
(),
&
qwMsg
));
QW_ERR_RET
(
qwPre
p
rocessQuery
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"prerocessQuery end, handle:%p"
,
pMsg
->
info
.
handle
);
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
a0f7cc67
...
...
@@ -469,7 +469,7 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
}
int32_t
qwPrerocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
int32_t
qwPre
p
rocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
int32_t
code
=
0
;
bool
queryRsped
=
false
;
SSubplan
*
plan
=
NULL
;
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
a0f7cc67
...
...
@@ -28,15 +28,6 @@ extern "C" {
#include "trpc.h"
#include "command.h"
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum
{
SCH_READ
=
1
,
SCH_WRITE
,
...
...
@@ -54,6 +45,24 @@ typedef enum {
SCH_OP_GET_STATUS
,
}
SCH_OP_TYPE
;
typedef
enum
{
SCH_LOAD_SEQ
=
1
,
SCH_RANDOM
,
SCH_ALL
,
}
SCH_POLICY
;
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
typedef
struct
SSchDebug
{
bool
lockEnable
;
bool
apiEnable
;
...
...
@@ -184,34 +193,36 @@ typedef struct SSchLevel {
typedef
struct
SSchTaskProfile
{
int64_t
startTs
;
int64_t
*
execTime
;
SArray
*
execTime
;
int64_t
waitTime
;
int64_t
endTs
;
}
SSchTaskProfile
;
typedef
struct
SSchTask
{
uint64_t
taskId
;
// task id
SRWLatch
lock
;
// task reentrant lock
int32_t
maxExecTimes
;
// task may exec times
int32_t
execId
;
// task current execute try index
SSchLevel
*
level
;
// level
SRWLatch
planLock
;
// task update plan lock
SSubplan
*
plan
;
// subplan
char
*
msg
;
// operator tree
int32_t
msgLen
;
// msg length
int8_t
status
;
// task status
int32_t
lastMsgType
;
// last sent msg type
int64_t
timeoutUsec
;
// taks timeout useconds before reschedule
SQueryNodeAddr
succeedAddr
;
// task executed success node address
int8_t
candidateIdx
;
// current try condidation index
SArray
*
candidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
SHashObj
*
execNodes
;
// all tried node for current task, element is SSchNodeInfo
SSchTaskProfile
profile
;
// task execution profile
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
void
*
handle
;
// task send handle
bool
registerdHb
;
// registered in hb
uint64_t
taskId
;
// task id
SRWLatch
lock
;
// task reentrant lock
int32_t
maxExecTimes
;
// task max exec times
int32_t
maxRetryTimes
;
// task max retry times
int32_t
retryTimes
;
// task retry times
int32_t
execId
;
// task current execute index
SSchLevel
*
level
;
// level
SRWLatch
planLock
;
// task update plan lock
SSubplan
*
plan
;
// subplan
char
*
msg
;
// operator tree
int32_t
msgLen
;
// msg length
int8_t
status
;
// task status
int32_t
lastMsgType
;
// last sent msg type
int64_t
timeoutUsec
;
// task timeout useconds before reschedule
SQueryNodeAddr
succeedAddr
;
// task executed success node address
int8_t
candidateIdx
;
// current try condidation index
SArray
*
candidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
SHashObj
*
execNodes
;
// all tried node for current task, element is SSchNodeInfo
SSchTaskProfile
profile
;
// task execution profile
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
void
*
handle
;
// task send handle
bool
registerdHb
;
// registered in hb
}
SSchTask
;
typedef
struct
SSchJobAttr
{
...
...
@@ -298,7 +309,6 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_TASK_MAX_EXEC_TIMES(_levelIdx, _levelNum) (SCH_MAX_CANDIDATE_EP_NUM * ((_levelNum) - (_levelIdx)))
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
...
...
@@ -320,8 +330,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.execTime[idx] = us; \
taosArrayPush((_task)->profile.execTime, &us); \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
...
...
@@ -330,8 +339,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.waitTime += us - (_task)->profile.execTime[idx]; \
(_task)->profile.waitTime += us - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
} while (0)
...
...
@@ -339,7 +347,8 @@ extern SSchedulerMgmt schMgmt;
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.execTime[idx] = us - (_task)->profile.execTime[idx]; \
int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
*startts = us - *startts; \
(_task)->profile.endTs = us; \
} while (0)
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
a0f7cc67
...
...
@@ -42,7 +42,18 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
taosHashCleanup
(
pTask
->
execNodes
);
}
taosMemoryFree
(
pTask
->
profile
.
execTime
);
taosArrayDestroy
(
pTask
->
profile
.
execTime
);
}
void
schInitTaskRetryTimes
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSchLevel
*
pLevel
,
int32_t
levelNum
)
{
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
)
||
(
!
SCH_IS_QUERY_JOB
(
pJob
))
||
(
SCH_ALL
!=
schMgmt
.
cfg
.
schPolicy
))
{
pTask
->
maxRetryTimes
=
SCH_MAX_CANDIDATE_EP_NUM
;
}
else
{
int32_t
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
pTask
->
maxRetryTimes
=
TMAX
(
nodeNum
,
SCH_MAX_CANDIDATE_EP_NUM
);
}
pTask
->
maxExecTimes
=
pTask
->
maxRetryTimes
*
(
levelNum
-
pLevel
->
level
);
}
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
,
int32_t
levelNum
)
{
...
...
@@ -51,12 +62,14 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
pTask
->
execId
=
-
1
;
pTask
->
maxExecTimes
=
SCH_TASK_MAX_EXEC_TIMES
(
pLevel
->
level
,
levelNum
);
pTask
->
timeoutUsec
=
SCH_DEFAULT_TASK_TIMEOUT_USEC
;
pTask
->
taskId
=
schGenTaskId
();
pTask
->
execNodes
=
taosHashInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
pTask
->
profile
.
execTime
=
taosMemoryCalloc
(
pTask
->
maxExecTimes
,
sizeof
(
int64_t
));
schInitTaskRetryTimes
(
pJob
,
pTask
,
pLevel
,
levelNum
);
pTask
->
profile
.
execTime
=
taosArrayInit
(
pTask
->
maxExecTimes
,
sizeof
(
int64_t
));
if
(
NULL
==
pTask
->
execNodes
||
NULL
==
pTask
->
profile
.
execTime
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -67,7 +80,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
_return:
taos
MemoryFreeClear
(
pTask
->
profile
.
execTime
);
taos
ArrayDestroy
(
pTask
->
profile
.
execTime
);
taosHashCleanup
(
pTask
->
execNodes
);
SCH_RET
(
code
);
...
...
@@ -285,6 +298,10 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
}
int32_t
schRescheduleTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
!
schMgmt
.
cfg
.
enableReSchedule
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -320,6 +337,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
taosMemoryFreeClear
(
pTask
->
msg
);
pTask
->
msgLen
=
0
;
pTask
->
lastMsgType
=
0
;
pTask
->
retryTimes
=
0
;
memset
(
&
pTask
->
succeedAddr
,
0
,
sizeof
(
pTask
->
succeedAddr
));
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
...
...
@@ -493,9 +511,15 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
}
}
if
((
pTask
->
retryTimes
+
1
)
>
pTask
->
maxRetryTimes
)
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry since reach max retry times, retryTimes:%d/%d"
,
pTask
->
retryTimes
,
pTask
->
maxRetryTimes
);
return
TSDB_CODE_SUCCESS
;
}
if
((
pTask
->
execId
+
1
)
>=
pTask
->
maxExecTimes
)
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry since reach max
try times, execId:%d"
,
pTask
->
execId
);
SCH_TASK_DLOG
(
"task no more retry since reach max
exec times, execId:%d/%d"
,
pTask
->
execId
,
pTask
->
maxExecTimes
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -647,10 +671,31 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe
int32_t
schSwitchTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
candidateNum
=
taosArrayGetSize
(
pTask
->
candidateAddrs
);
if
(
++
pTask
->
candidateIdx
>=
candidateNum
)
{
pTask
->
candidateIdx
=
0
;
if
(
candidateNum
<=
1
)
{
goto
_return
;
}
switch
(
schMgmt
.
cfg
.
schPolicy
)
{
case
SCH_LOAD_SEQ
:
case
SCH_ALL
:
default:
if
(
++
pTask
->
candidateIdx
>=
candidateNum
)
{
pTask
->
candidateIdx
=
0
;
}
break
;
case
SCH_RANDOM
:
{
int32_t
lastIdx
=
pTask
->
candidateIdx
;
while
(
lastIdx
==
pTask
->
candidateIdx
)
{
pTask
->
candidateIdx
=
taosRand
()
%
candidateNum
;
}
break
;
}
}
SCH_TASK_DLOG
(
"switch task candiateIdx to %d"
,
pTask
->
candidateIdx
);
_return:
SCH_TASK_DLOG
(
"switch task candiateIdx to %d/%d"
,
pTask
->
candidateIdx
,
candidateNum
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -737,6 +782,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
atomic_add_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
pTask
->
execId
++
;
pTask
->
retryTimes
++
;
SCH_TASK_DLOG
(
"start to launch task's %dth exec"
,
pTask
->
execId
);
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
a0f7cc67
...
...
@@ -22,26 +22,19 @@ SSchedulerMgmt schMgmt = {
.
jobRef
=
-
1
,
};
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
)
{
int32_t
schedulerInit
()
{
if
(
schMgmt
.
jobRef
>=
0
)
{
qError
(
"scheduler already initialized"
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
if
(
cfg
)
{
schMgmt
.
cfg
=
*
cfg
;
if
(
schMgmt
.
cfg
.
maxJobNum
==
0
)
{
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_MAX_JOB_NUM
;
}
if
(
schMgmt
.
cfg
.
maxNodeTableNum
<=
0
)
{
schMgmt
.
cfg
.
maxNodeTableNum
=
SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM
;
}
}
else
{
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_MAX_JOB_NUM
;
schMgmt
.
cfg
.
maxNodeTableNum
=
SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM
;
}
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_MAX_JOB_NUM
;
schMgmt
.
cfg
.
maxNodeTableNum
=
SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM
;
schMgmt
.
cfg
.
schPolicy
=
SCHEDULE_DEFAULT_POLICY
;
schMgmt
.
cfg
.
enableReSchedule
=
true
;
qDebug
(
"schedule policy init to %d"
,
schMgmt
.
cfg
.
schPolicy
);
schMgmt
.
jobRef
=
taosOpenRef
(
schMgmt
.
cfg
.
maxJobNum
,
schFreeJobImpl
);
if
(
schMgmt
.
jobRef
<
0
)
{
qError
(
"init schduler jobRef failed, num:%u"
,
schMgmt
.
cfg
.
maxJobNum
);
...
...
@@ -130,6 +123,26 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb
(
pTrans
);
}
int32_t
schedulerUpdatePolicy
(
int32_t
policy
)
{
switch
(
policy
)
{
case
SCH_LOAD_SEQ
:
case
SCH_RANDOM
:
case
SCH_ALL
:
schMgmt
.
cfg
.
schPolicy
=
policy
;
qDebug
(
"schedule policy updated to %d"
,
schMgmt
.
cfg
.
schPolicy
);
break
;
default:
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerEnableReSchedule
(
bool
enableResche
)
{
schMgmt
.
cfg
.
enableReSchedule
=
enableResche
;
return
TSDB_CODE_SUCCESS
;
}
void
schedulerFreeJob
(
int64_t
*
jobId
,
int32_t
errCode
)
{
if
(
0
==
*
jobId
)
{
return
;
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
a0f7cc67
...
...
@@ -477,7 +477,7 @@ void* schtRunJobThread(void *aa) {
schtInitLogFile
();
int32_t
code
=
schedulerInit
(
NULL
);
int32_t
code
=
schedulerInit
();
assert
(
code
==
0
);
...
...
tests/script/api/batchprepare.c
浏览文件 @
a0f7cc67
...
...
@@ -2685,6 +2685,8 @@ int main(int argc, char *argv[])
runAll
(
taos
);
taos_close
(
taos
);
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录