Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ad032a0a
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ad032a0a
编写于
8月 09, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(stream): stream recover
上级
48449e80
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
79 addition
and
24 deletion
+79
-24
examples/rust
examples/rust
+0
-1
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+1
-0
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+9
-9
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+10
-0
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+59
-13
tools/taos-tools
tools/taos-tools
+0
-1
未找到文件。
rust
@
7ed7a977
Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12
include/libs/stream/tstream.h
浏览文件 @
ad032a0a
...
...
@@ -513,6 +513,7 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t
streamMetaBegin
(
SStreamMeta
*
pMeta
);
int32_t
streamMetaCommit
(
SStreamMeta
*
pMeta
);
int32_t
streamMetaRollBack
(
SStreamMeta
*
pMeta
);
int32_t
streamLoadTasks
(
SStreamMeta
*
pMeta
);
#ifdef __cplusplus
}
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
ad032a0a
...
...
@@ -19,23 +19,23 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
void
*
exec
=
pTask
->
exec
.
executor
;
// set input
SStreamQueueItem
*
pItem
=
(
SStreamQueueItem
*
)
data
;
const
SStreamQueueItem
*
pItem
=
(
const
SStreamQueueItem
*
)
data
;
if
(
pItem
->
type
==
STREAM_INPUT__GET_RES
)
{
SStreamTrigger
*
pTrigger
=
(
SStreamTrigger
*
)
data
;
const
SStreamTrigger
*
pTrigger
=
(
const
SStreamTrigger
*
)
data
;
qSetMultiStreamInput
(
exec
,
pTrigger
->
pBlock
,
1
,
STREAM_INPUT__DATA_BLOCK
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
);
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
const
SStreamDataSubmit
*
pSubmit
=
(
const
SStreamDataSubmit
*
)
data
;
qDebug
(
"task %d %p set submit input %p %p %d 1"
,
pTask
->
taskId
,
pTask
,
pSubmit
,
pSubmit
->
data
,
*
pSubmit
->
dataRef
);
qSetMultiStreamInput
(
exec
,
pSubmit
->
data
,
1
,
STREAM_INPUT__DATA_SUBMIT
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
SArray
*
blocks
=
pBlock
->
blocks
;
const
SStreamDataBlock
*
pBlock
=
(
const
SStreamDataBlock
*
)
data
;
SArray
*
blocks
=
pBlock
->
blocks
;
qDebug
(
"task %d %p set ssdata input"
,
pTask
->
taskId
,
pTask
);
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_INPUT__DATA_BLOCK
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__MERGED_SUBMIT
)
{
SStreamMergedSubmit
*
pMerged
=
(
SStreamMergedSubmit
*
)
data
;
SArray
*
blocks
=
pMerged
->
reqs
;
const
SStreamMergedSubmit
*
pMerged
=
(
const
SStreamMergedSubmit
*
)
data
;
SArray
*
blocks
=
pMerged
->
reqs
;
qDebug
(
"task %d %p set submit input (merged), batch num: %d"
,
pTask
->
taskId
,
pTask
,
(
int32_t
)
blocks
->
size
);
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_INPUT__MERGED_SUBMIT
);
}
else
{
...
...
@@ -51,8 +51,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
}
if
(
output
==
NULL
)
{
if
(
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
SSDataBlock
block
=
{
0
};
SStreamDataBlock
*
pRetrieveBlock
=
(
SStreamDataBlock
*
)
data
;
SSDataBlock
block
=
{
0
};
const
SStreamDataBlock
*
pRetrieveBlock
=
(
const
SStreamDataBlock
*
)
data
;
ASSERT
(
taosArrayGetSize
(
pRetrieveBlock
->
blocks
)
==
1
);
assignOneDataBlock
(
&
block
,
taosArrayGet
(
pRetrieveBlock
->
blocks
,
0
));
block
.
info
.
type
=
STREAM_PULL_OVER
;
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
ad032a0a
...
...
@@ -48,8 +48,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta
->
ahandle
=
ahandle
;
pMeta
->
expandFunc
=
expandFunc
;
if
(
streamLoadTasks
(
pMeta
)
<
0
)
{
goto
_err
;
}
return
pMeta
;
_err:
if
(
pMeta
->
path
)
taosMemoryFree
(
pMeta
->
path
);
if
(
pMeta
->
pTasks
)
taosHashCleanup
(
pMeta
->
pTasks
);
if
(
pMeta
->
pStateDb
)
tdbTbClose
(
pMeta
->
pStateDb
);
if
(
pMeta
->
pTaskDb
)
tdbTbClose
(
pMeta
->
pTaskDb
);
if
(
pMeta
->
db
)
tdbClose
(
pMeta
->
db
);
taosMemoryFree
(
pMeta
);
return
NULL
;
}
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
ad032a0a
...
...
@@ -130,14 +130,13 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
return
0
;
}
int32_t
stream
CheckSinkLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
int32_t
stream
SaveStateInfo
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
void
*
buf
=
NULL
;
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
);
int32_t
sz
=
taosArrayGetSize
(
pTask
->
checkpointInfo
);
SStreamMultiVgCheckpointInfo
checkpoint
;
checkpoint
.
checkpointId
=
0
;
checkpoint
.
checkpointId
=
atomic_fetch_add_32
(
&
pTask
->
nextCheckId
,
1
)
;
checkpoint
.
checkTs
=
taosGetTimestampMs
();
checkpoint
.
streamId
=
pTask
->
streamId
;
checkpoint
.
taskId
=
pTask
->
taskId
;
...
...
@@ -169,16 +168,21 @@ int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
goto
FAIL
;
}
int32_t
sz
=
taosArrayGetSize
(
pTask
->
checkpointInfo
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SStreamCheckpointInfo
*
pCheck
=
taosArrayGet
(
pTask
->
checkpointInfo
,
i
);
pCheck
->
stateSaveVer
=
pCheck
->
stateProcessedVer
;
}
taosMemoryFree
(
buf
);
return
0
;
FAIL:
if
(
buf
)
taosMemoryFree
(
buf
);
return
-
1
;
return
0
;
}
int32_t
streamRecoverSinkLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
);
// load status
int32_t
streamLoadStateInfo
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
void
*
pVal
=
NULL
;
int32_t
vLen
=
0
;
if
(
tdbTbGet
(
pMeta
->
pStateDb
,
&
pTask
->
taskId
,
sizeof
(
void
*
),
&
pVal
,
&
vLen
)
<
0
)
{
...
...
@@ -196,29 +200,71 @@ int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
return
0
;
}
int32_t
streamCheckAggLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
int32_t
streamSaveSinkLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
);
return
streamSaveStateInfo
(
pMeta
,
pTask
);
}
int32_t
streamRecoverSinkLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
);
return
streamLoadStateInfo
(
pMeta
,
pTask
);
}
int32_t
streamSaveAggLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
);
// save and copy state
// TODO save and copy state
// save state info
if
(
streamSaveStateInfo
(
pMeta
,
pTask
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
streamFetchSinkStatus
(
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
!=
TASK_LEVEL__SINK
);
// set self status to recover_phase1
// build fetch status msg
// send fetch msg
return
0
;
}
int32_t
streamProcessFetchStatusRsp
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
,
void
*
msg
)
{
// if failed, set timer and retry
// if successful
// add rsp state to partial recover hash
// if complete, begin actual recover
return
0
;
}
int32_t
streamRecoverAggLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
);
// try recover sink level
// after all sink level recovered, choose current state backend to recover
// recover sink level
// after all sink level recovered
// choose suitable state to recover
return
0
;
}
int32_t
stream
Check
SourceLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
int32_t
stream
Save
SourceLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
);
// try recover agg level
//
// TODO: save and copy state
return
0
;
}
int32_t
streamRecoverSourceLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
);
// if totLevel == 3
// fetch agg state
// recover from local state to agg state, not send msg
// recover from agg state to most recent log v1
// enable input queue, set status recover_phase2
// recover from v1 to queue msg v2, set status normal
// if totLevel == 2
// fetch sink state
// recover from local state to sink state v1, send msg
// enable input queue, set status recover_phase2
// recover from v1 to queue msg v2, set status normal
return
0
;
}
...
...
taos-tools
@
3c7dafee
Subproject commit 3c7dafeea3e558968165b73bee0f51024898e3da
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录