Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
71984b42
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
71984b42
编写于
5月 20, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(stream): exec
上级
8b067c6c
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
149 addition
and
64 deletion
+149
-64
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+7
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+57
-7
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+0
-10
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+85
-47
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
71984b42
...
...
@@ -107,6 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
if
(
ref
==
0
)
{
taosMemoryFree
(
pDataSubmit
->
data
);
taosMemoryFree
(
pDataSubmit
->
dataRef
);
taosFreeQitem
(
pDataSubmit
);
}
}
...
...
@@ -279,6 +280,12 @@ typedef struct {
SArray
*
res
;
// SArray<SSDataBlock>
}
SStreamSinkReq
;
typedef
struct
{
SMsgHead
head
;
int64_t
streamId
;
int32_t
taskId
;
}
SStreamTaskRunReq
;
int32_t
streamEnqueueDataSubmit
(
SStreamTask
*
pTask
,
SStreamDataSubmit
*
input
);
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
);
int32_t
streamDequeueOutput
(
SStreamTask
*
pTask
,
void
**
output
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
71984b42
...
...
@@ -112,19 +112,18 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
if
(
pIter
==
NULL
)
break
;
pExec
=
(
STqExec
*
)
pIter
;
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
if
(
isAdd
)
{
continue
;
}
else
{
if
(
!
isAdd
)
{
int32_t
sz
=
taosArrayGetSize
(
tbUidList
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pExec
->
pDropTbUid
,
&
tbUid
,
sizeof
(
int64_t
),
NULL
,
0
);
}
}
}
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
task
[
i
],
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
else
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
task
[
i
],
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
}
}
return
0
;
...
...
@@ -1059,6 +1058,57 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo
return
0
;
}
int32_t
tqProcessStreamTriggerNew
(
STQ
*
pTq
,
SSubmitReq
*
data
)
{
SStreamDataSubmit
*
pSubmit
=
NULL
;
// build data
pSubmit
=
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
if
(
pSubmit
==
NULL
)
return
-
1
;
pSubmit
->
dataRef
=
taosMemoryMalloc
(
sizeof
(
int32_t
));
if
(
pSubmit
->
dataRef
==
NULL
)
goto
FAIL
;
*
pSubmit
->
dataRef
=
1
;
pSubmit
->
data
=
data
;
pSubmit
->
type
=
STREAM_INPUT__DATA_BLOCK
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
pTask
->
inputType
==
TASK_INPUT_TYPE__SUMBIT_BLOCK
)
{
streamEnqueueDataSubmit
(
pTask
,
pSubmit
);
// TODO cal back pressure
}
// check run
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
status
);
if
(
execStatus
==
TASK_STATUS__IDLE
||
execStatus
==
TASK_STATUS__CLOSING
)
{
SStreamTaskRunReq
*
pReq
=
taosMemoryMalloc
(
sizeof
(
SStreamTaskRunReq
));
if
(
pReq
==
NULL
)
continue
;
// TODO: do we need htonl?
pReq
->
head
.
vgId
=
pTq
->
pVnode
->
config
.
vgId
;
pReq
->
streamId
=
pTask
->
streamId
;
pReq
->
taskId
=
pTask
->
taskId
;
SRpcMsg
msg
=
{
.
msgType
=
0
,
.
pCont
=
pReq
,
.
contLen
=
sizeof
(
SStreamTaskRunReq
),
};
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
FETCH_QUEUE
,
&
msg
);
}
}
streamDataSubmitRefDec
(
pSubmit
);
return
0
;
FAIL:
if
(
pSubmit
)
{
if
(
pSubmit
->
dataRef
)
{
taosMemoryFree
(
pSubmit
->
dataRef
);
}
taosFreeQitem
(
pSubmit
);
}
return
-
1
;
}
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
)
{
SStreamTaskExecReq
req
;
tDecodeSStreamTaskExecReq
(
msg
,
&
req
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
71984b42
...
...
@@ -34,21 +34,11 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
int32_t
tqReadHandleSetMsg
(
STqReadHandle
*
pReadHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
)
{
pReadHandle
->
pMsg
=
pMsg
;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate and convert
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pReadHandle
->
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
if
(
tGetSubmitMsgNext
(
&
pReadHandle
->
msgIter
,
&
pReadHandle
->
pBlock
)
<
0
)
return
-
1
;
if
(
pReadHandle
->
pBlock
==
NULL
)
break
;
// pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
// pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
// pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
// pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
// pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
// pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
}
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pReadHandle
->
msgIter
)
<
0
)
return
-
1
;
...
...
source/libs/stream/src/tstream.c
浏览文件 @
71984b42
...
...
@@ -247,6 +247,19 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
void
*
data
=
NULL
;
taosGetQitem
(
pTask
->
inputQAll
,
&
data
);
if
(
data
==
NULL
)
break
;
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
taosFreeQitem
(
data
);
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SStreamDataBlock
*
resQ
=
taosAllocateQitem
(
sizeof
(
void
**
),
DEF_QITEM
);
resQ
->
type
=
STREAM_INPUT__DATA_BLOCK
;
resQ
->
blocks
=
pRes
;
taosWriteQitem
(
pTask
->
outputQ
,
resQ
);
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
goto
FAIL
;
}
}
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
);
...
...
@@ -298,62 +311,66 @@ int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
}
// dispatch
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
// TODO dispatch guard
int8_t
outputStatus
=
atomic_load_8
(
&
pTask
->
outputStatus
);
if
(
outputStatus
==
TASK_OUTPUT_STATUS__NORMAL
)
{
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_PIPE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_PIPE_EXEC
)
{
qType
=
FETCH_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_MERGE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_MERGE_EXEC
)
{
qType
=
MERGE_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_WRITE_EXEC
)
{
qType
=
WRITE_QUEUE
;
}
else
{
ASSERT
(
0
);
}
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_PIPE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_PIPE_EXEC
)
{
qType
=
FETCH_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_MERGE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_MERGE_EXEC
)
{
qType
=
MERGE_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_WRITE_EXEC
)
{
qType
=
WRITE_QUEUE
;
}
else
{
ASSERT
(
0
);
}
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
return
-
1
;
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pArray
==
NULL
)
{
return
-
1
;
}
taosHashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
));
}
taos
HashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
)
);
taos
ArrayPush
(
pArray
,
pDataBlock
);
}
taosArrayPush
(
pArray
,
pDataBlock
);
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
}
}
}
return
0
;
...
...
@@ -406,11 +423,32 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream
return
0
;
}
int32_t
streamTaskProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
pRsp
->
inputStatus
);
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO: init recover timer
}
// continue dispatch
streamTaskSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
streamTaskExec2
(
pTask
,
pMsgCb
);
streamTaskSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamTaskProcessRecoverReq
(
SStreamTask
*
pTask
,
char
*
msg
)
{
//
return
0
;
}
int32_t
streamTaskProcessRecoverRsp
(
SStreamTask
*
pTask
,
char
*
msg
)
{
//
return
0
;
}
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
)
{
SArray
*
pRes
=
NULL
;
// source
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录