Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0f59e881
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
0f59e881
编写于
12月 20, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
12月 20, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9201 from taosdata/feature/scheduler
Feature/scheduler
上级
0b579e58
97da2ee9
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
576 addition
and
42 deletion
+576
-42
include/common/taosmsg.h
include/common/taosmsg.h
+22
-2
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+1
-1
include/libs/planner/planner.h
include/libs/planner/planner.h
+2
-2
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+4
-3
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+10
-0
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+1
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+1
-1
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+2
-2
source/libs/scheduler/CMakeLists.txt
source/libs/scheduler/CMakeLists.txt
+2
-2
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+35
-9
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+494
-19
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/taosmsg.h
浏览文件 @
0f59e881
...
...
@@ -51,6 +51,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MQ_CONNECT
,
"mq-connect"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MQ_DISCONNECT
,
"mq-disconnect"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MQ_SET
,
"mq-set"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_RSP_READY
,
"rsp-ready"
)
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CONNECT
,
"connect"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_ACCT
,
"create-acct"
)
...
...
@@ -232,9 +234,9 @@ typedef struct {
}
SEpAddrMsg
;
typedef
struct
{
char
*
fqdn
;
char
fqdn
[
TSDB_FQDN_LEN
]
;
uint16_t
port
;
}
SEpAddr
1
;
}
SEpAddr
;
typedef
struct
{
int32_t
numOfVnodes
;
...
...
@@ -1079,6 +1081,24 @@ typedef struct {
/* data */
}
SUpdateTagValRsp
;
typedef
struct
SSchedulerQueryMsg
{
uint64_t
queryId
;
uint64_t
taskId
;
uint32_t
contentLen
;
char
msg
[];
}
SSchedulerQueryMsg
;
typedef
struct
SSchedulerReadyMsg
{
uint64_t
queryId
;
uint64_t
taskId
;
}
SSchedulerReadyMsg
;
typedef
struct
SSchedulerFetchMsg
{
uint64_t
queryId
;
uint64_t
taskId
;
}
SSchedulerFetchMsg
;
#pragma pack(pop)
#ifdef __cplusplus
...
...
include/libs/catalog/catalog.h
浏览文件 @
0f59e881
...
...
@@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const S
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
const
SEpSet
*
pMgmtEps
,
SEpSet
*
pQnodeEpSet
);
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SEpSet
*
pQnodeEpSet
);
...
...
include/libs/planner/planner.h
浏览文件 @
0f59e881
...
...
@@ -141,8 +141,8 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @ep
s Execution location of this group of datasource subplans, is an array of SEpAddr structure
s
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
Array
*
eps
);
// @ep
one execution location of this group of datasource subplan
s
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
EpAddr
*
ep
);
int32_t
qExplainQuery
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
0f59e881
...
...
@@ -21,9 +21,10 @@ extern "C" {
#endif
#include "planner.h"
#include "catalog.h"
typedef
struct
SSchedulerCfg
{
int32_t
clusterType
;
}
SSchedulerCfg
;
typedef
struct
SQueryProfileSummary
{
...
...
@@ -55,9 +56,9 @@ typedef struct SQueryProfileSummary {
* @param pJob
* @return
*/
int32_t
scheduleQueryJob
(
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleQueryJob
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
*
data
);
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
*
*
data
);
/**
...
...
include/util/taoserror.h
浏览文件 @
0f59e881
...
...
@@ -493,6 +493,7 @@ int32_t* taosGetErrno();
//scheduler
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
0f59e881
...
...
@@ -634,6 +634,16 @@ _return:
CTG_RET
(
code
);
}
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SEpSet
*
pQnodeEpSet
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pQnodeEpSet
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
return
TSDB_CODE_SUCCESS
;
}
void
catalogDestroy
(
void
)
{
if
(
ctgMgmt
.
pCluster
)
{
taosHashCleanup
(
ctgMgmt
.
pCluster
);
//TBD
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
0f59e881
...
...
@@ -100,7 +100,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
);
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
Array
*
eps
);
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
EpAddr
*
ep
);
int32_t
subPlanToString
(
const
SSubplan
*
pPhyNode
,
char
**
str
);
int32_t
stringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
0f59e881
...
...
@@ -225,6 +225,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return
TSDB_CODE_SUCCESS
;
}
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
Array
*
eps
)
{
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
EpAddr
*
ep
)
{
//todo
}
source/libs/planner/src/planner.c
浏览文件 @
0f59e881
...
...
@@ -46,8 +46,8 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
return
TSDB_CODE_SUCCESS
;
}
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
Array
*
eps
)
{
return
setSubplanExecutionNode
(
subplan
,
templateId
,
ep
s
);
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
EpAddr
*
ep
)
{
return
setSubplanExecutionNode
(
subplan
,
templateId
,
ep
);
}
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
)
{
...
...
source/libs/scheduler/CMakeLists.txt
浏览文件 @
0f59e881
...
...
@@ -9,5 +9,5 @@ target_include_directories(
target_link_libraries
(
scheduler
PRIVATE os util planner qcom common
)
\ No newline at end of file
PRIVATE os util planner qcom common catalog transport
)
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
0f59e881
...
...
@@ -27,6 +27,9 @@ extern "C" {
#include "thash.h"
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
enum
{
SCH_STATUS_NOT_START
=
1
,
...
...
@@ -39,22 +42,27 @@ enum {
typedef
struct
SSchedulerMgmt
{
uint64_t
taskId
;
SSchedulerCfg
cfg
;
SHashObj
*
Jobs
;
// key: queryId, value: SQueryJob*
}
SSchedulerMgmt
;
typedef
struct
SQueryTask
{
uint64_t
taskId
;
// task id
char
*
msg
;
// operator tree
int8_t
status
;
// task status
SQueryProfileSummary
summary
;
// task execution summary
uint64_t
taskId
;
// task id
SSubplan
*
plan
;
// subplan
char
*
msg
;
// operator tree
int8_t
status
;
// task status
SEpAddr
execAddr
;
// task actual executed node address
SQueryProfileSummary
summary
;
// task execution summary
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
}
SQueryTask
;
typedef
struct
SQueryLevel
{
int32_t
level
;
int8_t
status
;
int32_t
taskNum
;
SArray
*
subTasks
;
// Element is SQueryTask
SArray
*
subPlans
;
// Element is SSubplan
}
SQueryLevel
;
typedef
struct
SQueryJob
{
...
...
@@ -63,13 +71,29 @@ typedef struct SQueryJob {
int32_t
levelIdx
;
int8_t
status
;
SQueryProfileSummary
summary
;
SArray
*
levels
;
// Element is SQueryLevel, starting from 0.
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
SEpSet
dataSrcEps
;
SEpAddr
resEp
;
struct
SCatalog
*
catalog
;
void
*
rpc
;
SEpSet
*
mgmtEpSet
;
tsem_t
rspSem
;
int32_t
userFetch
;
int32_t
remoteFetch
;
void
*
res
;
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SArray
*
levels
;
// Element is SQueryLevel, starting from 0.
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryJob
;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
...
@@ -77,6 +101,8 @@ typedef struct SQueryJob {
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
extern
int32_t
schLaunchTask
(
SQueryJob
*
job
,
SQueryTask
*
task
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
0f59e881
...
...
@@ -16,6 +16,7 @@
#include "schedulerInt.h"
#include "taosmsg.h"
#include "query.h"
#include "catalog.h"
SSchedulerMgmt
schMgmt
=
{
0
};
...
...
@@ -50,23 +51,109 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
*/
}
int32_t
schBuildTaskRalation
(
SQueryJob
*
job
,
SHashObj
*
planToTask
)
{
for
(
int32_t
i
=
0
;
i
<
job
->
levelNum
;
++
i
)
{
SQueryLevel
*
level
=
taosArrayGet
(
job
->
levels
,
i
);
for
(
int32_t
m
=
0
;
m
<
level
->
taskNum
;
++
m
)
{
SQueryTask
*
task
=
taosArrayGet
(
level
->
subTasks
,
m
);
SSubplan
*
plan
=
task
->
plan
;
int32_t
childNum
=
(
int32_t
)
taosArrayGetSize
(
plan
->
pChildern
);
int32_t
parentNum
=
(
int32_t
)
taosArrayGetSize
(
plan
->
pParents
);
if
(
childNum
>
0
)
{
task
->
children
=
taosArrayInit
(
childNum
,
POINTER_BYTES
);
if
(
NULL
==
task
->
children
)
{
qError
(
"taosArrayInit %d failed"
,
childNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
SSubplan
*
child
=
taosArrayGet
(
plan
->
pChildern
,
n
);
SQueryTask
*
childTask
=
taosHashGet
(
planToTask
,
&
child
,
POINTER_BYTES
);
if
(
childTask
)
{
qError
(
"subplan relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
if
(
NULL
==
taosArrayPush
(
task
->
children
,
&
childTask
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
if
(
parentNum
>
0
)
{
task
->
parents
=
taosArrayInit
(
parentNum
,
POINTER_BYTES
);
if
(
NULL
==
task
->
parents
)
{
qError
(
"taosArrayInit %d failed"
,
parentNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
SSubplan
*
parent
=
taosArrayGet
(
plan
->
pParents
,
n
);
SQueryTask
*
parentTask
=
taosHashGet
(
planToTask
,
&
parent
,
POINTER_BYTES
);
if
(
parentTask
)
{
qError
(
"subplan relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
if
(
NULL
==
taosArrayPush
(
task
->
parents
,
&
parentTask
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
}
SQueryLevel
*
level
=
taosArrayGet
(
job
->
levels
,
0
);
if
(
level
->
taskNum
>
1
)
{
qError
(
"invalid plan info, level 0, taskNum:%d"
,
level
->
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SQueryTask
*
task
=
taosArrayGet
(
level
->
subTasks
,
0
);
if
(
task
->
parents
&&
taosArrayGetSize
(
task
->
parents
)
>
0
)
{
qError
(
"invalid plan info, level 0, parentNum:%d"
,
(
int32_t
)
taosArrayGetSize
(
task
->
parents
));
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schValidateAndBuildJob
(
SQueryDag
*
dag
,
SQueryJob
*
job
)
{
int32_t
code
=
0
;
job
->
queryId
=
dag
->
queryId
;
if
(
dag
->
numOfSubplans
<=
0
)
{
qError
(
"invalid subplan num:%d"
,
dag
->
numOfSubplans
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
levelNum
=
(
int32_t
)
taosArrayGetSize
(
dag
->
pSubplans
);
if
(
levelNum
<=
0
)
{
qError
(
"invalid level num:%d"
,
levelNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SHashObj
*
planToTask
=
taosHashInit
(
SCHEDULE_DEFAULT_TASK_NUMBER
,
taosGetDefaultHashFunction
(
POINTER_BYTES
==
sizeof
(
int64_t
)
?
TSDB_DATA_TYPE_BIGINT
:
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
planToTask
)
{
qError
(
"taosHashInit %d failed"
,
SCHEDULE_DEFAULT_TASK_NUMBER
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
job
->
levels
=
taosArrayInit
(
levelNum
,
sizeof
(
SQueryLevel
));
if
(
NULL
==
job
->
levels
)
{
qError
(
"taosArrayInit %d failed"
,
levelNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_
J
RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
job
->
levelNum
=
levelNum
;
job
->
levelIdx
=
levelNum
-
1
;
job
->
status
=
SCH_STATUS_NOT_START
;
job
->
subPlans
=
dag
->
pSubplans
;
...
...
@@ -77,32 +164,39 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
level
.
status
=
SCH_STATUS_NOT_START
;
for
(
int32_t
i
=
0
;
i
<
levelNum
;
++
i
)
{
level
.
level
=
i
;
levelPlans
=
taosArrayGetP
(
dag
->
pSubplans
,
i
);
if
(
NULL
==
levelPlans
)
{
qError
(
"no level plans for level %d"
,
i
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_
J
RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
levelPlanNum
=
(
int32_t
)
taosArrayGetSize
(
levelPlans
);
if
(
levelPlanNum
<=
0
)
{
qError
(
"invalid level plans number:%d, level:%d"
,
levelPlanNum
,
i
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_
J
RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
level
.
taskNum
=
levelPlanNum
;
level
.
subPlans
=
levelPlans
;
level
.
subTasks
=
taosArrayInit
(
levelPlanNum
,
sizeof
(
SQueryTask
));
if
(
NULL
==
level
.
subTasks
)
{
qError
(
"taosArrayInit %d failed"
,
levelPlanNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_
J
RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
for
(
int32_t
n
=
0
;
n
<
levelPlanNum
;
++
n
)
{
SSubplan
*
plan
=
taosArrayGet
(
levelPlans
,
n
);
SQueryTask
*
task
=
taosArrayGet
(
level
.
subTasks
,
n
);
task
->
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
task
->
plan
=
plan
;
task
->
status
=
SCH_STATUS_NOT_START
;
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
if
(
NULL
==
taosArrayPush
(
job
->
levels
,
&
level
))
{
...
...
@@ -111,6 +205,12 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
}
}
SCH_ERR_JRET
(
schBuildTaskRalation
(
job
,
planToTask
));
if
(
planToTask
)
{
taosHashCleanup
(
planToTask
);
}
return
TSDB_CODE_SUCCESS
;
_return:
...
...
@@ -118,20 +218,319 @@ _return:
taosArrayDestroy
(
level
.
subTasks
);
}
if
(
planToTask
)
{
taosHashCleanup
(
planToTask
);
}
SCH_RET
(
code
);
}
int32_t
schAvailableEpSet
(
SQueryJob
*
job
,
SEpSet
*
epSet
)
{
if
(
epSet
->
numOfEps
>=
SCH_MAX_CONDIDATE_EP_NUM
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
SCH_HAS_QNODE_IN_CLUSTER
(
schMgmt
.
cfg
.
clusterType
))
{
SCH_ERR_RET
(
catalogGetQnodeList
(
job
->
catalog
,
job
->
rpc
,
job
->
mgmtEpSet
,
epSet
));
}
else
{
for
(
int32_t
i
=
0
;
i
<
job
->
dataSrcEps
.
numOfEps
;
++
i
)
{
strncpy
(
epSet
->
fqdn
[
epSet
->
numOfEps
],
job
->
dataSrcEps
.
fqdn
[
i
],
sizeof
(
job
->
dataSrcEps
.
fqdn
[
i
]));
epSet
->
port
[
epSet
->
numOfEps
]
=
job
->
dataSrcEps
.
port
[
i
];
++
epSet
->
numOfEps
;
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schJobExecute
(
SQueryJob
*
job
)
{
switch
(
job
->
status
)
{
case
SCH_STATUS_NOT_START
:
int32_t
schPushTaskToExecList
(
SQueryJob
*
job
,
SQueryTask
*
task
)
{
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schMoveTaskToSuccList
(
SQueryJob
*
job
,
SQueryTask
*
task
,
bool
*
moved
)
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed"
,
task
->
taskId
);
return
TSDB_CODE_SUCCESS
;
}
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
*
moved
=
true
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schAsyncSendMsg
(
SQueryJob
*
job
,
SQueryTask
*
task
,
int32_t
msgType
)
{
int32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
switch
(
msgType
)
{
case
TSDB_MSG_TYPE_QUERY
:
{
if
(
NULL
==
task
->
msg
)
{
qError
(
"query msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
int32_t
len
=
strlen
(
task
->
msg
);
msgSize
=
sizeof
(
SSchedulerQueryMsg
)
+
len
;
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSchedulerQueryMsg
*
pMsg
=
msg
;
pMsg
->
queryId
=
job
->
queryId
;
pMsg
->
taskId
=
task
->
taskId
;
pMsg
->
contentLen
=
len
;
memcpy
(
pMsg
->
msg
,
task
->
msg
,
len
);
break
;
}
case
TSDB_MSG_TYPE_RSP_READY
:
{
msgSize
=
sizeof
(
SSchedulerReadyMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSchedulerReadyMsg
*
pMsg
=
msg
;
pMsg
->
queryId
=
job
->
queryId
;
pMsg
->
taskId
=
task
->
taskId
;
break
;
}
case
TSDB_MSG_TYPE_FETCH
:
{
msgSize
=
sizeof
(
SSchedulerFetchMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSchedulerFetchMsg
*
pMsg
=
msg
;
pMsg
->
queryId
=
job
->
queryId
;
pMsg
->
taskId
=
task
->
taskId
;
break
;
}
default:
qError
(
"unknown msg type:%d"
,
msgType
);
break
;
}
//TODO SEND MSG
return
TSDB_CODE_SUCCESS
;
}
int32_t
schTaskCheckAndSetRetry
(
SQueryJob
*
job
,
SQueryTask
*
task
,
int32_t
errCode
,
bool
*
needRetry
)
{
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
*
needRetry
=
false
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schFetchFromRemote
(
SQueryJob
*
job
)
{
int32_t
code
=
0
;
if
(
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
0
,
1
)
!=
0
)
{
qInfo
(
"prior fetching not finished"
);
return
TSDB_CODE_SUCCESS
;
}
SCH_ERR_JRET
(
schAsyncSendMsg
(
job
,
NULL
,
TSDB_MSG_TYPE_FETCH
));
return
TSDB_CODE_SUCCESS
;
_return:
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
return
code
;
}
int32_t
schProcessOnJobSuccess
(
SQueryJob
*
job
)
{
job
->
status
=
SCH_STATUS_SUCCEED
;
if
(
job
->
userFetch
)
{
SCH_ERR_RET
(
schFetchFromRemote
(
job
));
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schProcessOnJobFailure
(
SQueryJob
*
job
)
{
job
->
status
=
SCH_STATUS_FAILED
;
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
if
(
job
->
userFetch
)
{
tsem_post
(
&
job
->
rspSem
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schProcessOnDataFetched
(
SQueryJob
*
job
)
{
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
tsem_post
(
&
job
->
rspSem
);
}
int32_t
schProcessOnTaskSuccess
(
SQueryJob
*
job
,
SQueryTask
*
task
)
{
bool
moved
=
false
;
SCH_ERR_RET
(
schMoveTaskToSuccList
(
job
,
task
,
&
moved
));
if
(
!
moved
)
{
SCH_TASK_ERR_LOG
(
"task may already moved, status:%d"
,
task
->
status
);
return
TSDB_CODE_SUCCESS
;
}
task
->
status
=
SCH_STATUS_SUCCEED
;
int32_t
parentNum
=
(
int32_t
)
taosArrayGetSize
(
task
->
parents
);
if
(
parentNum
==
0
)
{
if
(
task
->
plan
->
level
!=
0
)
{
qError
(
"level error"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
strncpy
(
job
->
resEp
.
fqdn
,
task
->
execAddr
.
fqdn
,
sizeof
(
job
->
resEp
.
fqdn
));
job
->
resEp
.
port
=
task
->
execAddr
.
port
;
SCH_ERR_RET
(
schProcessOnJobSuccess
(
job
));
return
TSDB_CODE_SUCCESS
;
}
if
(
SCH_IS_DATA_SRC_TASK
(
task
)
&&
job
->
dataSrcEps
.
numOfEps
<
SCH_MAX_CONDIDATE_EP_NUM
)
{
strncpy
(
job
->
dataSrcEps
.
fqdn
[
job
->
dataSrcEps
.
numOfEps
],
task
->
execAddr
.
fqdn
,
sizeof
(
task
->
execAddr
.
fqdn
));
job
->
dataSrcEps
.
port
[
job
->
dataSrcEps
.
numOfEps
]
=
task
->
execAddr
.
port
;
++
job
->
dataSrcEps
.
numOfEps
;
}
for
(
int32_t
i
=
0
;
i
<
parentNum
;
++
i
)
{
SQueryTask
*
par
=
taosArrayGet
(
task
->
parents
,
i
);
++
par
->
childReady
;
SCH_ERR_RET
(
qSetSubplanExecutionNode
(
par
->
plan
,
task
->
plan
->
id
.
templateId
,
&
task
->
execAddr
));
if
(
SCH_TASK_READY_TO_LUNCH
(
par
))
{
SCH_ERR_RET
(
schLaunchTask
(
job
,
task
));
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schProcessOnTaskFailure
(
SQueryJob
*
job
,
SQueryTask
*
task
,
int32_t
errCode
)
{
bool
needRetry
=
false
;
SCH_ERR_RET
(
schTaskCheckAndSetRetry
(
job
,
task
,
errCode
,
&
needRetry
));
if
(
!
needRetry
)
{
SCH_TASK_ERR_LOG
(
"task failed[%x], no more retry"
,
errCode
);
job
->
status
=
SCH_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
));
return
TSDB_CODE_SUCCESS
;
}
SCH_ERR_RET
(
schLaunchTask
(
job
,
task
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
schHandleRspMsg
(
SQueryJob
*
job
,
SQueryTask
*
task
,
int32_t
msgType
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
switch
(
msgType
)
{
case
TSDB_MSG_TYPE_QUERY
:
if
(
rspCode
!=
TSDB_CODE_SUCCESS
)
{
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rspCode
));
}
else
{
code
=
schAsyncSendMsg
(
job
,
task
,
TSDB_MSG_TYPE_RSP_READY
);
if
(
code
)
{
goto
_task_error
;
}
}
break
;
case
TSDB_MSG_TYPE_RSP_READY
:
if
(
rspCode
!=
TSDB_CODE_SUCCESS
)
{
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rspCode
));
}
else
{
code
=
schProcessOnTaskSuccess
(
job
,
task
);
if
(
code
)
{
goto
_task_error
;
}
}
break
;
case
TSDB_MSG_TYPE_FETCH
:
SCH_ERR_JRET
(
rspCode
);
SCH_ERR_JRET
(
schProcessOnDataFetched
(
job
));
break
;
default:
SCH_JOB_ERR_LOG
(
"invalid job status:%d"
,
job
->
status
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
)
;
qError
(
"unknown msg type:%d received"
,
msgType
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
return
TSDB_CODE_SUCCESS
;
_task_error:
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
code
));
return
TSDB_CODE_SUCCESS
;
_return:
code
=
schProcessOnJobFailure
(
job
);
return
code
;
}
int32_t
schLaunchTask
(
SQueryJob
*
job
,
SQueryTask
*
task
)
{
SSubplan
*
plan
=
task
->
plan
;
SCH_ERR_RET
(
qSubPlanToString
(
plan
,
&
task
->
msg
));
if
(
plan
->
execEpSet
.
numOfEps
<=
0
)
{
SCH_ERR_RET
(
schAvailableEpSet
(
job
,
&
plan
->
execEpSet
));
}
SCH_ERR_RET
(
schAsyncSendMsg
(
job
,
task
,
TSDB_MSG_TYPE_QUERY
));
SCH_ERR_RET
(
schPushTaskToExecList
(
job
,
task
));
task
->
status
=
SCH_STATUS_EXECUTING
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schLaunchJob
(
SQueryJob
*
job
)
{
SQueryLevel
*
level
=
taosArrayGet
(
job
->
levels
,
job
->
levelIdx
);
for
(
int32_t
i
=
0
;
i
<
level
->
taskNum
;
++
i
)
{
SQueryTask
*
task
=
taosArrayGet
(
level
->
subTasks
,
i
);
SCH_ERR_RET
(
schLaunchTask
(
job
,
task
));
}
job
->
status
=
SCH_STATUS_EXECUTING
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -141,12 +540,16 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_LRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
,
"init %d schduler jobs failed"
,
SCHEDULE_DEFAULT_JOB_NUMBER
);
}
if
(
cfg
)
{
schMgmt
.
cfg
=
*
cfg
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
scheduleQueryJob
(
SQueryDag
*
pDag
,
void
**
pJob
)
{
if
(
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
int32_t
scheduleQueryJob
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
if
(
NULL
==
p
Catalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
p
Dag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -156,12 +559,37 @@ int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
job
->
catalog
=
pCatalog
;
job
->
rpc
=
pRpc
;
job
->
mgmtEpSet
=
(
SEpSet
*
)
pMgmtEps
;
SCH_ERR_JRET
(
schValidateAndBuildJob
(
pDag
,
job
));
SCH_ERR_JRET
(
schJobExecute
(
job
));
job
->
execTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
job
->
execTasks
)
{
qError
(
"taosHashInit %d failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
*
(
SQueryJob
**
)
pJob
=
job
;
job
->
succTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
job
->
succTasks
)
{
qError
(
"taosHashInit %d failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
tsem_init
(
&
job
->
rspSem
,
0
,
0
);
if
(
0
!=
taosHashPut
(
schMgmt
.
Jobs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
),
&
job
,
POINTER_BYTES
))
{
qError
(
"taosHashPut queryId:%"
PRIx64
" failed"
,
job
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
job
->
status
=
SCH_STATUS_NOT_START
;
SCH_ERR_JRET
(
schLaunchJob
(
job
));
*
(
SQueryJob
**
)
pJob
=
job
;
return
TSDB_CODE_SUCCESS
;
_return:
...
...
@@ -172,17 +600,64 @@ _return:
SCH_RET
(
code
);
}
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
*
data
);
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
**
data
)
{
if
(
NULL
==
pJob
||
NULL
==
data
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
scheduleCancelJob
(
void
*
pJob
);
SQueryJob
*
job
=
pJob
;
int32_t
code
=
0
;
if
(
atomic_val_compare_exchange_32
(
&
job
->
userFetch
,
0
,
1
)
!=
0
)
{
qError
(
"prior fetching not finished"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
if
(
job
->
status
==
SCH_STATUS_SUCCEED
)
{
SCH_ERR_JRET
(
schFetchFromRemote
(
job
));
}
tsem_wait
(
&
job
->
rspSem
);
*
data
=
job
->
res
;
job
->
res
=
NULL
;
_return:
atomic_val_compare_exchange_32
(
&
job
->
userFetch
,
1
,
0
);
return
code
;
}
int32_t
scheduleCancelJob
(
void
*
pJob
)
{
//TODO
return
TSDB_CODE_SUCCESS
;
}
void
scheduleFreeJob
(
void
*
pJob
)
{
if
(
NULL
==
pJob
)
{
return
;
}
SQueryJob
*
job
=
pJob
;
if
(
job
->
status
>
0
)
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
Jobs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
)))
{
qError
(
"remove job:%"
PRIx64
"from mgmt failed"
,
job
->
queryId
);
// maybe already freed
return
;
}
if
(
job
->
status
==
SCH_STATUS_EXECUTING
)
{
scheduleCancelJob
(
pJob
);
}
}
//TODO free job
}
void
schedulerDestroy
(
void
)
{
if
(
schMgmt
.
Jobs
)
{
taosHashCleanup
(
schMgmt
.
Jobs
);
//T
BD
taosHashCleanup
(
schMgmt
.
Jobs
);
//T
ODO
schMgmt
.
Jobs
=
NULL
;
}
}
...
...
source/util/src/terror.c
浏览文件 @
0f59e881
...
...
@@ -499,6 +499,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error"
//scheduler
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_STATUS_ERROR
,
"scheduler status error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_INTERNAL_ERROR
,
"scheduler internal error"
)
#ifdef TAOS_ERROR_C
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录