Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4b8cd944
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
4b8cd944
编写于
1月 11, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
1月 11, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9711 from taosdata/feature/qnode
Feature/qnode
上级
9f1f333d
7850f2a9
变更
4
展开全部
隐藏空白更改
内联
并排
Showing
4 changed file
with
509 addition
and
266 deletion
+509
-266
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-0
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+14
-8
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+487
-251
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+7
-7
未找到文件。
include/libs/qcom/query.h
浏览文件 @
4b8cd944
...
@@ -35,6 +35,7 @@ enum {
...
@@ -35,6 +35,7 @@ enum {
JOB_TASK_STATUS_CANCELLING
,
JOB_TASK_STATUS_CANCELLING
,
JOB_TASK_STATUS_CANCELLED
,
JOB_TASK_STATUS_CANCELLED
,
JOB_TASK_STATUS_DROPPING
,
JOB_TASK_STATUS_DROPPING
,
JOB_TASK_STATUS_FREEING
,
};
};
typedef
struct
STableComInfo
{
typedef
struct
STableComInfo
{
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
4b8cd944
...
@@ -61,14 +61,17 @@ typedef struct SSchLevel {
...
@@ -61,14 +61,17 @@ typedef struct SSchLevel {
typedef
struct
SSchTask
{
typedef
struct
SSchTask
{
uint64_t
taskId
;
// task id
uint64_t
taskId
;
// task id
SRWLatch
lock
;
// task lock
SSchLevel
*
level
;
// level
SSchLevel
*
level
;
// level
SSubplan
*
plan
;
// subplan
SSubplan
*
plan
;
// subplan
char
*
msg
;
// operator tree
char
*
msg
;
// operator tree
int32_t
msgLen
;
// msg length
int32_t
msgLen
;
// msg length
int8_t
status
;
// task status
int8_t
status
;
// task status
SQueryNodeAddr
execAddr
;
// task actual executed node address
int32_t
lastMsgType
;
// last sent msg type
SQueryNodeAddr
succeedAddr
;
// task executed success node address
int8_t
candidateIdx
;
// current try condidation index
int8_t
candidateIdx
;
// current try condidation index
SArray
*
candidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
SArray
*
candidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
SArray
*
execAddrs
;
// all tried node for current task, element is SQueryNodeAddr
SQueryProfileSummary
summary
;
// task execution summary
SQueryProfileSummary
summary
;
// task execution summary
int32_t
childReady
;
// child task ready number
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
...
@@ -96,22 +99,24 @@ typedef struct SSchJob {
...
@@ -96,22 +99,24 @@ typedef struct SSchJob {
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
int8_t
status
;
int32_t
ref
;
int8_t
status
;
SQueryNodeAddr
resNode
;
SQueryNodeAddr
resNode
;
tsem_t
rspSem
;
tsem_t
rspSem
;
int
32_t
userFetch
;
int
8_t
userFetch
;
int32_t
remoteFetch
;
int32_t
remoteFetch
;
SSchTask
*
fetchTask
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
int32_t
errCode
;
void
*
res
;
void
*
res
;
//TODO free it or not
int32_t
resNumOfRows
;
int32_t
resNumOfRows
;
SQueryProfileSummary
summary
;
SQueryProfileSummary
summary
;
}
SSchJob
;
}
SSchJob
;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
...
@@ -125,8 +130,9 @@ typedef struct SSchJob {
...
@@ -125,8 +130,9 @@ typedef struct SSchJob {
#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) qWarn("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
@@ -137,7 +143,7 @@ typedef struct SSchJob {
...
@@ -137,7 +143,7 @@ typedef struct SSchJob {
static
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
);
static
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
);
static
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
);
static
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
SQueryNodeAddr
*
addr
,
int32_t
msgType
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
4b8cd944
此差异已折叠。
点击以展开。
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
4b8cd944
...
@@ -36,7 +36,7 @@
...
@@ -36,7 +36,7 @@
namespace
{
namespace
{
extern
"C"
int32_t
sch
ProcessRsp
Msg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
extern
"C"
int32_t
sch
HandleResponse
Msg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
void
schtBuildQueryDag
(
SQueryDag
*
dag
)
{
void
schtBuildQueryDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000001
;
uint64_t
qId
=
0x0000000000000001
;
...
@@ -188,7 +188,7 @@ void *schtSendRsp(void *param) {
...
@@ -188,7 +188,7 @@ void *schtSendRsp(void *param) {
SShellSubmitRsp
rsp
=
{
0
};
SShellSubmitRsp
rsp
=
{
0
};
rsp
.
affectedRows
=
10
;
rsp
.
affectedRows
=
10
;
sch
ProcessRsp
Msg
(
job
,
task
,
TDMT_VND_SUBMIT
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
sch
HandleResponse
Msg
(
job
,
task
,
TDMT_VND_SUBMIT
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
}
...
@@ -233,7 +233,7 @@ TEST(queryTest, normalCase) {
...
@@ -233,7 +233,7 @@ TEST(queryTest, normalCase) {
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
SQueryTableRsp
rsp
=
{
0
};
code
=
sch
ProcessRsp
Msg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
HandleResponse
Msg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
...
@@ -244,7 +244,7 @@ TEST(queryTest, normalCase) {
...
@@ -244,7 +244,7 @@ TEST(queryTest, normalCase) {
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
SResReadyRsp
rsp
=
{
0
};
code
=
sch
ProcessRsp
Msg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
HandleResponse
Msg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
...
@@ -255,7 +255,7 @@ TEST(queryTest, normalCase) {
...
@@ -255,7 +255,7 @@ TEST(queryTest, normalCase) {
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
SQueryTableRsp
rsp
=
{
0
};
code
=
sch
ProcessRsp
Msg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
HandleResponse
Msg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
...
@@ -266,7 +266,7 @@ TEST(queryTest, normalCase) {
...
@@ -266,7 +266,7 @@ TEST(queryTest, normalCase) {
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
SResReadyRsp
rsp
=
{
0
};
code
=
sch
ProcessRsp
Msg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
HandleResponse
Msg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
...
@@ -275,7 +275,7 @@ TEST(queryTest, normalCase) {
...
@@ -275,7 +275,7 @@ TEST(queryTest, normalCase) {
SRetrieveTableRsp
rsp
=
{
0
};
SRetrieveTableRsp
rsp
=
{
0
};
rsp
.
completed
=
1
;
rsp
.
completed
=
1
;
rsp
.
numOfRows
=
10
;
rsp
.
numOfRows
=
10
;
code
=
sch
ProcessRsp
Msg
(
job
,
NULL
,
TDMT_VND_FETCH
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
HandleResponse
Msg
(
job
,
NULL
,
TDMT_VND_FETCH
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录