Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f9f5e600
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f9f5e600
编写于
3月 24, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/scheduler
上级
4cd1d75a
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
35 addition
and
8 deletion
+35
-8
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+22
-7
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+13
-1
未找到文件。
source/libs/qworker/src/qworker.c
浏览文件 @
f9f5e600
...
@@ -519,7 +519,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
...
@@ -519,7 +519,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
while
(
true
)
{
while
(
true
)
{
QW_TASK_DLOG
(
"start to execTask, loopIdx:%d"
,
i
++
);
QW_TASK_DLOG
(
"start to execTask, loopIdx:%d"
,
i
++
);
taosSsleep
(
20
);
code
=
qExecTask
(
*
taskHandle
,
&
pRes
,
&
useconds
);
code
=
qExecTask
(
*
taskHandle
,
&
pRes
,
&
useconds
);
if
(
code
)
{
if
(
code
)
{
QW_TASK_ELOG
(
"qExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
QW_TASK_ELOG
(
"qExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
...
@@ -730,9 +731,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
...
@@ -730,9 +731,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
}
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
dropConnection
=
&
ctx
->
connInfo
;
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
dropConnection
=
NULL
;
qwBuildAndSendDropRsp
(
&
ctx
->
connInfo
,
code
);
QW_TASK_DLOG
(
"drop rsp send, handle:%p, code:%x - %s"
,
ctx
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
dropConnection
=
&
ctx
->
connInfo
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_DROPPED
);
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_DROPPED
);
break
;
break
;
}
}
...
@@ -764,9 +769,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
...
@@ -764,9 +769,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
}
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
dropConnection
=
&
ctx
->
connInfo
;
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
dropConnection
=
NULL
;
qwBuildAndSendDropRsp
(
&
ctx
->
connInfo
,
code
);
QW_TASK_DLOG
(
"drop rsp send, handle:%p, code:%x - %s"
,
ctx
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
dropConnection
=
&
ctx
->
connInfo
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_DROPPED
);
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_DROPPED
);
}
}
...
@@ -847,6 +856,9 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
...
@@ -847,6 +856,9 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_TASK_WLOG
(
"drop received at wrong phase %s"
,
qwPhaseStr
(
phase
));
QW_TASK_WLOG
(
"drop received at wrong phase %s"
,
qwPhaseStr
(
phase
));
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
}
qwBuildAndSendDropRsp
(
&
ctx
->
connInfo
,
code
);
QW_TASK_DLOG
(
"drop rsp send, handle:%p, code:%x - %s"
,
ctx
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
...
@@ -1163,7 +1175,7 @@ _return:
...
@@ -1163,7 +1175,7 @@ _return:
int32_t
qwProcessDrop
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
int32_t
qwProcessDrop
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
bool
needRsp
=
false
;
bool
rsped
=
false
;
SQWTaskCtx
*
ctx
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
bool
locked
=
false
;
bool
locked
=
false
;
...
@@ -1184,13 +1196,16 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
...
@@ -1184,13 +1196,16 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET
(
qwKillTaskHandle
(
QW_FPARAMS
(),
ctx
));
QW_ERR_JRET
(
qwKillTaskHandle
(
QW_FPARAMS
(),
ctx
));
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_DROPPING
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_DROPPING
);
}
else
if
(
ctx
->
phase
>
0
)
{
}
else
if
(
ctx
->
phase
>
0
)
{
qwBuildAndSendDropRsp
(
&
ctx
->
connInfo
,
code
);
QW_TASK_DLOG
(
"drop rsp send, handle:%p, code:%x - %s"
,
ctx
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
needRsp
=
true
;
rsped
=
true
;
}
else
{
}
else
{
// task not started
// task not started
}
}
if
(
!
needRsp
)
{
if
(
!
rsped
)
{
ctx
->
connInfo
.
handle
==
qwMsg
->
connInfo
.
handle
;
ctx
->
connInfo
.
handle
==
qwMsg
->
connInfo
.
handle
;
ctx
->
connInfo
.
ahandle
=
qwMsg
->
connInfo
.
ahandle
;
ctx
->
connInfo
.
ahandle
=
qwMsg
->
connInfo
.
ahandle
;
...
@@ -1215,7 +1230,7 @@ _return:
...
@@ -1215,7 +1230,7 @@ _return:
qwReleaseTaskCtx
(
mgmt
,
ctx
);
qwReleaseTaskCtx
(
mgmt
,
ctx
);
}
}
if
(
TSDB_CODE_SUCCESS
!=
code
||
needRsp
)
{
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
qwBuildAndSendDropRsp
(
&
qwMsg
->
connInfo
,
code
);
qwBuildAndSendDropRsp
(
&
qwMsg
->
connInfo
,
code
);
QW_TASK_DLOG
(
"drop rsp send, handle:%p, code:%x - %s"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
QW_TASK_DLOG
(
"drop rsp send, handle:%p, code:%x - %s"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
}
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
f9f5e600
...
@@ -147,11 +147,23 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
...
@@ -147,11 +147,23 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_FETCH_RSP
:
if
(
lastMsgType
!=
reqMsgType
&&
-
1
!=
lastMsgType
)
{
SCH_TASK_ELOG
(
"rsp msg type mis-match, last sent msgType:%s, rspType:%s"
,
TMSG_INFO
(
lastMsgType
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
taskStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
taskStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%s, rspType:%s"
,
jobTaskStatusStr
(
taskStatus
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_CREATE_TABLE_RSP
:
case
TDMT_VND_CREATE_TABLE_RSP
:
case
TDMT_VND_SUBMIT_RSP
:
case
TDMT_VND_SUBMIT_RSP
:
case
TDMT_VND_FETCH_RSP
:
break
;
break
;
default:
default:
SCH_TASK_ELOG
(
"unknown rsp msg, type:%s, status:%s"
,
TMSG_INFO
(
msgType
),
jobTaskStatusStr
(
taskStatus
));
SCH_TASK_ELOG
(
"unknown rsp msg, type:%s, status:%s"
,
TMSG_INFO
(
msgType
),
jobTaskStatusStr
(
taskStatus
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录