Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c8b18c26
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c8b18c26
编写于
5月 16, 2022
作者:
L
Liu Jicong
提交者:
GitHub
5月 16, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12516 from taosdata/feature/stream
refactor(stream)
上级
a3d1b0a5
f77f9102
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
212 addition
and
6 deletion
+212
-6
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+4
-1
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+5
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+55
-1
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+148
-4
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
c8b18c26
...
...
@@ -40,6 +40,7 @@ enum {
TASK_INPUT_STATUS__BLOCKED
,
TASK_INPUT_STATUS__RECOVER
,
TASK_INPUT_STATUS__STOP
,
TASK_INPUT_STATUS__FAILED
,
};
enum
{
...
...
@@ -234,7 +235,9 @@ struct SStreamTask {
int8_t
outputStatus
;
STaosQueue
*
inputQ
;
STaosQall
*
inputQAll
;
STaosQueue
*
outputQ
;
STaosQall
*
outputQAll
;
// application storage
void
*
ahandle
;
...
...
@@ -282,7 +285,7 @@ int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
);
int32_t
streamTask
ExecNew
(
SStreamTask
*
pTask
);
int32_t
streamTask
Run
(
SStreamTask
*
pTask
);
int32_t
streamTaskHandleInput
(
SStreamTask
*
pTask
,
void
*
data
);
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
c8b18c26
...
...
@@ -194,6 +194,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
// source
pTask
->
sourceType
=
TASK_SOURCE__MERGE
;
pTask
->
inputType
=
TASK_INPUT_TYPE__DATA_BLOCK
;
// exec
pTask
->
execType
=
TASK_EXEC__NONE
;
...
...
@@ -235,6 +236,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
pTask
->
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
// source
pTask
->
sourceType
=
TASK_SOURCE__MERGE
;
pTask
->
inputType
=
TASK_INPUT_TYPE__DATA_BLOCK
;
// exec
pTask
->
execType
=
TASK_EXEC__NONE
;
...
...
@@ -309,6 +311,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
// source part
pTask
->
sourceType
=
TASK_SOURCE__SCAN
;
pTask
->
inputType
=
TASK_INPUT_TYPE__SUMBIT_BLOCK
;
// sink part
if
(
level
==
0
)
{
...
...
@@ -372,6 +375,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// source part, currently only support multi source
pTask
->
sourceType
=
TASK_SOURCE__PIPE
;
pTask
->
inputType
=
TASK_INPUT_TYPE__DATA_BLOCK
;
// sink part
pTask
->
sinkType
=
TASK_SINK__NONE
;
...
...
@@ -459,6 +463,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// source part
pTask
->
sourceType
=
TASK_SOURCE__MERGE
;
pTask
->
inputType
=
TASK_INPUT_TYPE__DATA_BLOCK
;
// sink part
pTask
->
sinkType
=
TASK_SINK__NONE
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
c8b18c26
...
...
@@ -14,6 +14,7 @@
*/
#include "tq.h"
#include "tqueue.h"
int32_t
tqInit
()
{
//
...
...
@@ -1032,6 +1033,59 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId)
return
0
;
}
int32_t
tqProcessStreamTrigger2
(
STQ
*
pTq
,
SSubmitReq
*
pReq
,
int64_t
ver
)
{
void
*
pIter
=
NULL
;
bool
failed
=
false
;
SStreamDataSubmit
*
pSubmit
=
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
if
(
pSubmit
==
NULL
)
{
failed
=
true
;
}
pSubmit
->
dataRef
=
taosMemoryMalloc
(
sizeof
(
int32_t
));
if
(
pSubmit
->
dataRef
==
NULL
)
{
failed
=
true
;
}
pSubmit
->
type
=
STREAM_DATA_TYPE_SUBMIT_BLOCK
;
pSubmit
->
sourceVer
=
ver
;
pSubmit
->
sourceVg
=
pTq
->
pVnode
->
config
.
vgId
;
pSubmit
->
data
=
pReq
;
*
pSubmit
->
dataRef
=
1
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
pTask
->
inputType
!=
STREAM_INPUT__DATA_SUBMIT
)
continue
;
int8_t
inputStatus
=
atomic_load_8
(
&
pTask
->
inputStatus
);
if
(
inputStatus
==
TASK_INPUT_STATUS__NORMAL
)
{
if
(
failed
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
continue
;
}
streamDataSubmitRefInc
(
pSubmit
);
taosWriteQitem
(
pTask
->
inputQ
,
pSubmit
);
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
status
);
if
(
execStatus
==
TASK_STATUS__IDLE
||
execStatus
==
TASK_STATUS__CLOSING
)
{
// TODO dispatch task launch msg to fetch queue
}
}
else
{
// blocked or stopped, do nothing
}
}
if
(
!
failed
)
{
streamDataSubmitRefDec
(
pSubmit
);
return
0
;
}
else
{
return
-
1
;
}
}
int32_t
tqProcessTaskExec2
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTaskExecReq
req
=
{
0
};
tDecodeSStreamTaskExecReq
(
msg
,
&
req
);
...
...
@@ -1051,7 +1105,7 @@ int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
// try exec
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
if
(
streamTask
ExecNew
(
pTask
)
<
0
)
{
if
(
streamTask
Run
(
pTask
)
<
0
)
{
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__CLOSING
);
goto
FAIL
;
...
...
source/libs/stream/src/tstream.c
浏览文件 @
c8b18c26
...
...
@@ -133,7 +133,144 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
return
inputStatus
;
}
int32_t
streamTaskProcessTriggerReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
char
*
msg
,
int32_t
msgLen
)
{
int32_t
streamTaskExecImpl
(
SStreamTask
*
pTask
,
void
*
data
,
SArray
*
pRes
)
{
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
// set input
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
ASSERT
(
pSubmit
->
type
==
STREAM_INPUT__DATA_SUBMIT
);
qSetStreamInput
(
exec
,
pSubmit
->
data
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
}
else
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_BLOCK
)
{
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
ASSERT
(
pBlock
->
type
==
STREAM_INPUT__DATA_BLOCK
);
SArray
*
blocks
=
pBlock
->
blocks
;
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
}
// exec
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
break
;
taosArrayPush
(
pRes
,
&
output
);
}
// destroy
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
streamDataSubmitRefDec
((
SStreamDataSubmit
*
)
data
);
}
else
{
taosArrayDestroyEx
(((
SStreamDataBlock
*
)
data
)
->
blocks
,
(
FDelete
)
tDeleteSSDataBlock
);
}
return
0
;
}
// TODO: handle version
int32_t
streamTaskExec2
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
return
-
1
;
while
(
1
)
{
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
// first run, from qall, handle failure from last exec
while
(
1
)
{
void
*
data
=
NULL
;
taosGetQitem
(
pTask
->
inputQAll
,
&
data
);
if
(
data
==
NULL
)
break
;
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
taosFreeQitem
(
data
);
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SStreamDataBlock
*
resQ
=
taosAllocateQitem
(
sizeof
(
void
**
),
DEF_QITEM
);
resQ
->
type
=
STREAM_INPUT__DATA_BLOCK
;
resQ
->
blocks
=
pRes
;
taosWriteQitem
(
pTask
->
outputQ
,
resQ
);
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
goto
FAIL
;
}
}
// second run, from inputQ
taosReadAllQitems
(
pTask
->
inputQ
,
pTask
->
inputQAll
);
while
(
1
)
{
void
*
data
=
NULL
;
taosGetQitem
(
pTask
->
inputQAll
,
&
data
);
if
(
data
==
NULL
)
break
;
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
taosFreeQitem
(
data
);
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SStreamDataBlock
*
resQ
=
taosAllocateQitem
(
sizeof
(
void
**
),
DEF_QITEM
);
resQ
->
type
=
STREAM_INPUT__DATA_BLOCK
;
resQ
->
blocks
=
pRes
;
taosWriteQitem
(
pTask
->
outputQ
,
resQ
);
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
goto
FAIL
;
}
}
// set status closing
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__CLOSING
);
// third run, make sure all inputQ is cleared
taosReadAllQitems
(
pTask
->
inputQ
,
pTask
->
inputQAll
);
while
(
1
)
{
void
*
data
=
NULL
;
taosGetQitem
(
pTask
->
inputQAll
,
&
data
);
if
(
data
==
NULL
)
break
;
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
taosFreeQitem
(
data
);
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SStreamDataBlock
*
resQ
=
taosAllocateQitem
(
sizeof
(
void
**
),
DEF_QITEM
);
resQ
->
type
=
STREAM_INPUT__DATA_BLOCK
;
resQ
->
blocks
=
pRes
;
taosWriteQitem
(
pTask
->
outputQ
,
resQ
);
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
goto
FAIL
;
}
}
// set status closing
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__CLOSING
);
// third run, make sure all inputQ is cleared
taosReadAllQitems
(
pTask
->
inputQ
,
pTask
->
inputQAll
);
while
(
1
)
{
void
*
data
=
NULL
;
taosGetQitem
(
pTask
->
inputQAll
,
&
data
);
if
(
data
==
NULL
)
break
;
}
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
);
break
;
}
else
if
(
execStatus
==
TASK_STATUS__CLOSING
)
{
continue
;
}
else
if
(
execStatus
==
TASK_STATUS__EXECUTING
)
{
break
;
}
else
{
ASSERT
(
0
);
}
}
return
0
;
FAIL:
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
);
return
-
1
;
}
int32_t
streamTaskDispatchDown
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
//
return
0
;
}
int32_t
streamTaskSink
(
SStreamTask
*
pTask
)
{
//
return
0
;
}
...
...
@@ -156,8 +293,8 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
// 2.3. closing: keep trying
while
(
1
)
{
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
const
SArray
*
blocks
=
pBlock
->
blocks
;
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
...
...
@@ -278,7 +415,7 @@ int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
return
0
;
}
int32_t
streamTask
ExecNew
(
SStreamTask
*
pTask
)
{
int32_t
streamTask
Run
(
SStreamTask
*
pTask
)
{
SArray
*
pRes
=
NULL
;
if
(
pTask
->
execType
==
TASK_EXEC__PIPE
||
pTask
->
execType
==
TASK_EXEC__MERGE
)
{
// TODO remove multi runner
...
...
@@ -494,11 +631,16 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
pTask
->
inputQ
=
taosOpenQueue
();
pTask
->
outputQ
=
taosOpenQueue
();
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
)
goto
FAIL
;
pTask
->
inputQAll
=
taosAllocateQall
();
pTask
->
outputQAll
=
taosAllocateQall
();
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
||
pTask
->
inputQAll
==
NULL
||
pTask
->
outputQAll
==
NULL
)
goto
FAIL
;
return
pTask
;
FAIL:
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
if
(
pTask
->
inputQAll
)
taosFreeQall
(
pTask
->
inputQAll
);
if
(
pTask
->
outputQAll
)
taosFreeQall
(
pTask
->
outputQAll
);
if
(
pTask
)
taosMemoryFree
(
pTask
);
return
NULL
;
}
...
...
@@ -507,6 +649,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if
(
tEncodeI64
(
pEncoder
,
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
inputType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
execType
)
<
0
)
return
-
1
;
...
...
@@ -552,6 +695,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
inputType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
execType
)
<
0
)
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录