Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a702b146
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a702b146
编写于
12月 18, 2021
作者:
D
dapan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix compile error
上级
c9940c0e
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
46 addition
and
40 deletion
+46
-40
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+2
-1
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+1
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+1
-1
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+2
-2
source/libs/scheduler/CMakeLists.txt
source/libs/scheduler/CMakeLists.txt
+2
-2
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+4
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+34
-32
未找到文件。
include/libs/scheduler/scheduler.h
浏览文件 @
a702b146
...
...
@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "planner.h"
#include "catalog.h"
typedef
struct
SSchedulerCfg
{
int32_t
clusterType
;
...
...
@@ -57,7 +58,7 @@ typedef struct SQueryProfileSummary {
*/
int32_t
scheduleQueryJob
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleFetchRows
(
void
*
pRpc
,
void
*
pJob
,
void
*
data
);
int32_t
scheduleFetchRows
(
void
*
pRpc
,
void
*
pJob
,
void
*
*
data
);
/**
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
a702b146
...
...
@@ -100,7 +100,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
);
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
Array
*
eps
);
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
EpAddr
*
ep
);
int32_t
subPlanToString
(
const
SSubplan
*
pPhyNode
,
char
**
str
);
int32_t
stringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
a702b146
...
...
@@ -225,6 +225,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return
TSDB_CODE_SUCCESS
;
}
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
Array
*
eps
)
{
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
EpAddr
*
ep
)
{
//todo
}
source/libs/planner/src/planner.c
浏览文件 @
a702b146
...
...
@@ -46,8 +46,8 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
return
TSDB_CODE_SUCCESS
;
}
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
Array
*
eps
)
{
return
setSubplanExecutionNode
(
subplan
,
templateId
,
ep
s
);
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
EpAddr
*
ep
)
{
return
setSubplanExecutionNode
(
subplan
,
templateId
,
ep
);
}
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
)
{
...
...
source/libs/scheduler/CMakeLists.txt
浏览文件 @
a702b146
...
...
@@ -9,5 +9,5 @@ target_include_directories(
target_link_libraries
(
scheduler
PRIVATE os util planner qcom common
)
\ No newline at end of file
PRIVATE os util planner qcom common catalog transport
)
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
a702b146
...
...
@@ -54,7 +54,7 @@ typedef struct SQueryTask {
SEpAddr
execAddr
;
// task actual executed node address
SQueryProfileSummary
summary
;
// task execution summary
int32_t
childReady
;
// child task ready number
SArray
*
child
er
n
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
child
re
n
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
}
SQueryTask
;
...
...
@@ -87,6 +87,7 @@ typedef struct SQueryJob {
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryJob
;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN)
...
...
@@ -99,6 +100,8 @@ typedef struct SQueryJob {
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
extern
int32_t
schTaskRun
(
SQueryJob
*
job
,
SQueryTask
*
task
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
a702b146
...
...
@@ -62,8 +62,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
int32_t
parentNum
=
(
int32_t
)
taosArrayGetSize
(
plan
->
pParents
);
if
(
childNum
>
0
)
{
task
->
child
er
n
=
taosArrayInit
(
childNum
,
POINTER_BYTES
);
if
(
NULL
==
task
->
child
er
n
)
{
task
->
child
re
n
=
taosArrayInit
(
childNum
,
POINTER_BYTES
);
if
(
NULL
==
task
->
child
re
n
)
{
qError
(
"taosArrayInit %d failed"
,
childNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -77,7 +77,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
if
(
NULL
==
taosArrayPush
(
task
->
child
er
n
,
&
childTask
))
{
if
(
NULL
==
taosArrayPush
(
task
->
child
re
n
,
&
childTask
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -233,7 +233,7 @@ int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) {
SCH_ERR_RET
(
catalogGetQnodeList
(
job
->
catalog
,
job
->
rpc
,
job
->
mgmtEpSet
,
epSet
));
}
else
{
for
(
int32_t
i
=
0
;
i
<
job
->
dataSrcEps
.
numOfEps
;
++
i
)
{
strncpy
(
epSet
->
fqdn
[
epSet
->
numOfEps
],
&
job
->
dataSrcEps
.
fqdn
[
i
],
sizeof
(
job
->
dataSrcEps
.
fqdn
));
strncpy
(
epSet
->
fqdn
[
epSet
->
numOfEps
],
job
->
dataSrcEps
.
fqdn
[
i
],
sizeof
(
job
->
dataSrcEps
.
fqdn
[
i
]
));
epSet
->
port
[
epSet
->
numOfEps
]
=
job
->
dataSrcEps
.
port
[
i
];
++
epSet
->
numOfEps
;
...
...
@@ -244,6 +244,32 @@ int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) {
}
int32_t
schPushTaskToExecList
(
SQueryJob
*
job
,
SQueryTask
*
task
)
{
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schMoveTaskToSuccList
(
SQueryJob
*
job
,
SQueryTask
*
task
,
bool
*
moved
)
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed"
,
task
->
taskId
);
return
TSDB_CODE_SUCCESS
;
}
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
*
moved
=
true
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schAsyncLaunchTask
(
SQueryJob
*
job
,
SQueryTask
*
task
)
{
}
...
...
@@ -271,7 +297,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET
(
schMoveTaskToSuccList
(
job
,
task
,
&
moved
));
if
(
!
moved
)
{
qWarn
(
"task[%d] already moved"
,
task
->
taskId
);
SCH_TASK_ERR_LOG
(
"task may already moved, status:%d"
,
task
->
status
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -285,7 +311,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
strncpy
(
job
->
resEp
.
fqdn
,
task
->
execAddr
.
fqdn
,
sizeof
(
job
->
resEp
.
fqdn
));
job
->
resEp
.
port
=
task
->
execAddr
.
port
;
SCH_ERR_RET
(
schProcessOnJobSuccess
());
SCH_ERR_RET
(
schProcessOnJobSuccess
(
job
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -331,30 +357,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
}
int32_t
schPushTaskToExecList
(
SQueryJob
*
job
,
SQueryTask
*
task
)
{
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schMoveTaskToSuccList
(
SQueryJob
*
job
,
SQueryTask
*
task
,
bool
*
moved
)
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed"
,
task
->
taskId
);
return
TSDB_CODE_SUCCESS
}
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
*
moved
=
true
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schTaskRun
(
SQueryJob
*
job
,
SQueryTask
*
task
)
{
...
...
@@ -367,7 +369,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET
(
schAsyncLaunchTask
(
job
,
task
));
SCH_ERR_RET
(
schPushTaskToExecList
(
job
,
task
))
SCH_ERR_RET
(
schPushTaskToExecList
(
job
,
task
))
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -410,7 +412,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM
job
->
catalog
=
pCatalog
;
job
->
rpc
=
pRpc
;
job
->
mgmtEpSet
=
pMgmtEps
;
job
->
mgmtEpSet
=
(
SEpSet
*
)
pMgmtEps
;
SCH_ERR_JRET
(
schValidateAndBuildJob
(
pDag
,
job
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录