Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
13bab114
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
13bab114
编写于
7月 24, 2023
作者:
H
Haojun Liao
提交者:
GitHub
7月 24, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22155 from taosdata/fix/3_liaohj
fix(stream): refactor the halt status to check more status.
上级
5f25d70f
1367552f
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
74 addition
and
17 deletion
+74
-17
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+3
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+11
-9
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+4
-5
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+56
-3
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
13bab114
...
...
@@ -610,6 +610,9 @@ int32_t streamRestoreParam(SStreamTask* pTask);
int32_t
streamSetStatusNormal
(
SStreamTask
*
pTask
);
const
char
*
streamGetTaskStatusStr
(
int32_t
status
);
void
streamTaskPause
(
SStreamTask
*
pTask
);
void
streamTaskResume
(
SStreamTask
*
pTask
);
void
streamTaskHalt
(
SStreamTask
*
pTask
);
void
streamTaskResumeFromHalt
(
SStreamTask
*
pTask
);
void
streamTaskDisablePause
(
SStreamTask
*
pTask
);
void
streamTaskEnablePause
(
SStreamTask
*
pTask
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
13bab114
...
...
@@ -1156,12 +1156,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep
(
100
);
}
// todo fix the race condition, when pause msg is received from mnode, add lock here
streamTaskHalt
(
pTask
);
// now we can stop the stream task execution
ASSERT
(
pStreamTask
->
status
.
taskStatus
==
TASK_STATUS__NORMAL
);
// todo upgrade the statu to be HALT from PAUSE or NORMAL
pStreamTask
->
status
.
taskStatus
=
TASK_STATUS__HALT
;
// todo disable the pause
tqDebug
(
"s-task:%s level:%d status is set to halt by fill-history task:%s"
,
pStreamTask
->
id
.
idStr
,
pStreamTask
->
info
.
taskLevel
,
id
);
...
...
@@ -1522,11 +1521,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
return
-
1
;
}
if
(
streamTaskShouldPause
(
&
pTask
->
status
))
{
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
pTask
->
status
.
keepTaskStatus
);
// todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal
streamTaskResume
(
pTask
);
int32_t
level
=
pTask
->
info
.
taskLevel
;
int8_t
status
=
pTask
->
status
.
taskStatus
;
if
(
status
==
TASK_STATUS__NORMAL
||
status
==
TASK_STATUS__SCAN_HISTORY
)
{
// no lock needs to secure the access of the version
if
(
igUntreated
&&
pTask
->
info
.
taskL
evel
==
TASK_LEVEL__SOURCE
&&
!
pTask
->
info
.
fillHistory
)
{
if
(
igUntreated
&&
l
evel
==
TASK_LEVEL__SOURCE
&&
!
pTask
->
info
.
fillHistory
)
{
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion
(
pTask
->
exec
.
pWalReader
,
sversion
);
tqDebug
(
"vgId:%d s-task:%s resume to exec, prev paused version:%"
PRId64
", start from vnode ver:%"
PRId64
...
...
@@ -1537,9 +1539,9 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId
,
pTask
->
id
.
idStr
,
pTask
->
chkInfo
.
currentVer
,
sversion
,
pTask
->
status
.
schedStatus
);
}
if
(
pTask
->
info
.
fillHistory
&&
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
)
{
if
(
level
==
TASK_LEVEL__SOURCE
&&
pTask
->
info
.
fillHistory
)
{
streamStartRecoverTask
(
pTask
,
igUntreated
);
}
else
if
(
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
&&
taosQueueItemSize
(
pTask
->
inputQueue
->
queue
)
==
0
)
{
}
else
if
(
level
==
TASK_LEVEL__SOURCE
&&
(
taosQueueItemSize
(
pTask
->
inputQueue
->
queue
)
==
0
)
)
{
tqStartStreamTasks
(
pTq
);
}
else
{
streamSchedExec
(
pTask
);
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
13bab114
...
...
@@ -345,7 +345,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
double
el
=
(
taosGetTimestampMs
()
-
st
)
/
1000
.
0
;
if
(
el
>
0
)
{
qDebug
(
"s-task:%s wait for stream task:%s for %.2fs to
handle all data in inputQ
"
,
pTask
->
id
.
idStr
,
qDebug
(
"s-task:%s wait for stream task:%s for %.2fs to
be idle
"
,
pTask
->
id
.
idStr
,
pStreamTask
->
id
.
idStr
,
el
);
}
}
...
...
@@ -377,13 +377,13 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
}
else
{
ASSERT
(
status
==
TASK_STATUS__SCAN_HISTORY
);
pStreamTask
->
status
.
taskStatus
=
TASK_STATUS__HALT
;
qDebug
(
"s-task:%s halt by related fill
history task:%s"
,
pStreamTask
->
id
.
idStr
,
pTask
->
id
.
idStr
);
qDebug
(
"s-task:%s halt by related fill
-
history task:%s"
,
pStreamTask
->
id
.
idStr
,
pTask
->
id
.
idStr
);
}
// wait for the stream task to handle all in the inputQ, and to be idle
waitForTaskIdle
(
pTask
,
pStreamTask
);
// In case of sink tasks, no need to
be halted for
them.
// In case of sink tasks, no need to
halt
them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
// When a task is idle with halt status, all data in inputQ are consumed.
...
...
@@ -405,8 +405,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState
(
pTask
);
streamTaskReloadState
(
pStreamTask
);
// reset the status of stream task
streamSetStatusNormal
(
pStreamTask
);
streamTaskResumeFromHalt
(
pStreamTask
);
pTask
->
status
.
taskStatus
=
TASK_STATUS__DROPPING
;
qDebug
(
"s-task:%s fill-history task set status to be dropping, save the state into disk"
,
pTask
->
id
.
idStr
);
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
13bab114
...
...
@@ -774,7 +774,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
pRange
->
range
.
minVer
=
0
;
pRange
->
range
.
maxVer
=
ver
;
qDebug
(
"s-task:%s level:%d related
-
fill-history task exists, update stream calc time window:%"
PRId64
" - %"
PRId64
qDebug
(
"s-task:%s level:%d related
fill-history task exists, update stream calc time window:%"
PRId64
" - %"
PRId64
", verRang:%"
PRId64
" - %"
PRId64
,
pTask
->
id
.
idStr
,
pTask
->
info
.
taskLevel
,
pRange
->
window
.
skey
,
pRange
->
window
.
ekey
,
pRange
->
range
.
minVer
,
pRange
->
range
.
maxVer
);
...
...
@@ -806,6 +806,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
streamTaskDoCheckDownstreamTasks
(
pTask
);
}
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
void
streamTaskPause
(
SStreamTask
*
pTask
)
{
SStreamMeta
*
pMeta
=
pTask
->
pMeta
;
...
...
@@ -824,11 +825,17 @@ void streamTaskPause(SStreamTask* pTask) {
}
while
(
!
pTask
->
status
.
pauseAllowed
||
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__HALT
))
{
if
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__DROPPING
)
{
status
=
pTask
->
status
.
taskStatus
;
if
(
status
==
TASK_STATUS__DROPPING
)
{
qDebug
(
"vgId:%d s-task:%s task already dropped, do nothing"
,
pMeta
->
vgId
,
pTask
->
id
.
idStr
);
return
;
}
if
(
status
==
TASK_STATUS__STOP
||
status
==
TASK_STATUS__PAUSE
)
{
qDebug
(
"vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing"
,
pMeta
->
vgId
,
pTask
->
id
.
idStr
,
str
);
return
;
}
qDebug
(
"s-task:%s wait for the task can be paused, vgId:%d"
,
pTask
->
id
.
idStr
,
pMeta
->
vgId
);
taosMsleep
(
100
);
}
...
...
@@ -841,6 +848,17 @@ void streamTaskPause(SStreamTask* pTask) {
streamGetTaskStatusStr
(
pTask
->
status
.
keepTaskStatus
),
(
int32_t
)
el
);
}
void
streamTaskResume
(
SStreamTask
*
pTask
)
{
int8_t
status
=
pTask
->
status
.
taskStatus
;
if
(
status
==
TASK_STATUS__PAUSE
)
{
pTask
->
status
.
taskStatus
=
pTask
->
status
.
keepTaskStatus
;
pTask
->
status
.
keepTaskStatus
=
TASK_STATUS__NORMAL
;
qDebug
(
"s-task:%s resume from pause"
,
pTask
->
id
.
idStr
);
}
else
{
qError
(
"s-task:%s not in pause, failed to resume, status:%s"
,
pTask
->
id
.
idStr
,
streamGetTaskStatusStr
(
status
));
}
}
// todo fix race condition
void
streamTaskDisablePause
(
SStreamTask
*
pTask
)
{
// pre-condition check
...
...
@@ -858,3 +876,38 @@ void streamTaskEnablePause(SStreamTask* pTask) {
qDebug
(
"s-task:%s enable task pause"
,
pTask
->
id
.
idStr
);
pTask
->
status
.
pauseAllowed
=
1
;
}
void
streamTaskHalt
(
SStreamTask
*
pTask
)
{
int8_t
status
=
pTask
->
status
.
taskStatus
;
if
(
status
==
TASK_STATUS__DROPPING
||
status
==
TASK_STATUS__STOP
)
{
return
;
}
if
(
status
==
TASK_STATUS__HALT
)
{
return
;
}
// upgrade to halt status
if
(
status
==
TASK_STATUS__PAUSE
)
{
qDebug
(
"s-task:%s upgrade status to %s from %s"
,
pTask
->
id
.
idStr
,
streamGetTaskStatusStr
(
TASK_STATUS__HALT
),
streamGetTaskStatusStr
(
TASK_STATUS__PAUSE
));
}
else
{
qDebug
(
"s-task:%s halt task"
,
pTask
->
id
.
idStr
);
}
pTask
->
status
.
keepTaskStatus
=
status
;
pTask
->
status
.
taskStatus
=
TASK_STATUS__HALT
;
}
void
streamTaskResumeFromHalt
(
SStreamTask
*
pTask
)
{
const
char
*
id
=
pTask
->
id
.
idStr
;
int8_t
status
=
pTask
->
status
.
taskStatus
;
if
(
status
!=
TASK_STATUS__HALT
)
{
qError
(
"s-task:%s not in halt status, status:%s"
,
id
,
streamGetTaskStatusStr
(
status
));
return
;
}
pTask
->
status
.
taskStatus
=
pTask
->
status
.
keepTaskStatus
;
pTask
->
status
.
keepTaskStatus
=
TASK_STATUS__NORMAL
;
qDebug
(
"s-task:%s resume from halt, current status:%s"
,
id
,
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
));
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录