Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
be8fd9e4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
be8fd9e4
编写于
5月 30, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(stream): create additional task for history data processing.
上级
2977a4d0
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
301 addition
and
145 deletion
+301
-145
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+2
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+2
-2
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+267
-142
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+30
-0
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
be8fd9e4
...
...
@@ -289,9 +289,10 @@ struct SStreamTask {
SCheckpointInfo
chkInfo
;
STaskExec
exec
;
int8_t
fillHistory
;
// fill history
int64_t
ekey
;
// end ts key
int64_t
endVer
;
// end version
SStreamId
historyTaskId
;
// children info
SArray
*
childEpInfo
;
// SArray<SStreamChildEpInfo*>
int32_t
nextCheckId
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
be8fd9e4
...
...
@@ -677,8 +677,8 @@ typedef struct {
char
*
physicalPlan
;
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
SArray
*
p
BatchTask
;
// generate the results for already stored ts data
int64_t
batc
hTaskUid
;
// stream task for history ts data
SArray
*
p
HTasksList
;
// generate the results for already stored ts data
int64_t
hTaskUid
;
// stream task for history ts data
SSchemaWrapper
outputSchema
;
SSchemaWrapper
tagSchema
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
be8fd9e4
...
...
@@ -169,7 +169,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
return
pObj
;
}
int32_t
mndAssignTaskToSnode
(
SMnode
*
pMnode
,
SStreamTask
*
pTask
,
SSubplan
*
plan
,
const
SSnodeObj
*
pSnode
)
{
int32_t
mndAssign
Stream
TaskToSnode
(
SMnode
*
pMnode
,
SStreamTask
*
pTask
,
SSubplan
*
plan
,
const
SSnodeObj
*
pSnode
)
{
int32_t
msgLen
;
pTask
->
nodeId
=
SNODE_HANDLE
;
...
...
@@ -242,17 +242,16 @@ static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStr
return
0
;
}
static
int32_t
addSourceStreamTask
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pTaskList
,
SArray
*
pSinkTaskList
,
SStreamObj
*
pStream
,
SS
ubplan
*
plan
,
uint64_t
uid
,
int8_t
taskLevel
,
int8_t
fillHistory
,
static
int32_t
addSourceStreamTask
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pTaskList
,
SArray
*
pSinkTaskList
,
SS
treamObj
*
pStream
,
SSubplan
*
plan
,
uint64_t
uid
,
int8_t
fillHistory
,
bool
hasExtraSink
)
{
SStreamTask
*
pTask
=
tNewStreamTask
(
uid
,
taskLevel
,
fillHistory
,
pStream
->
conf
.
triggerParam
,
pTaskList
);
SStreamTask
*
pTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__SOURCE
,
fillHistory
,
pStream
->
conf
.
triggerParam
,
pTaskList
);
if
(
pTask
==
NULL
)
{
return
terrno
;
}
// sink or dispatch
if
(
hasExtraSink
)
{
mndAddDispatcherForInnerTask
(
pMnode
,
pStream
,
pSinkTaskList
,
pTask
);
}
else
{
mndSetSinkTaskInfo
(
pStream
,
pTask
);
...
...
@@ -290,7 +289,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
pDstTask
->
dispatchMsgType
=
TDMT_STREAM_TASK_DISPATCH
;
}
int32_t
appendToDownstream
(
SStreamTask
*
pTask
,
SStreamTask
*
pDownstream
)
{
int32_t
setEpToDownstreamTask
(
SStreamTask
*
pTask
,
SStreamTask
*
pDownstream
)
{
SStreamChildEpInfo
*
pEpInfo
=
createStreamTaskEpInfo
(
pTask
);
if
(
pEpInfo
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -304,182 +303,312 @@ int32_t appendToDownstream(SStreamTask* pTask, SStreamTask* pDownstream) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
doScheduleStream
(
uint64_t
uid
,
SArray
*
pTasksList
,
SStreamObj
*
pStream
,
SMnode
*
pMnode
,
SQueryPlan
*
pPlan
)
{
static
SArray
*
addNewTaskList
(
SArray
*
pTasksList
)
{
SArray
*
pTaskList
=
taosArrayInit
(
0
,
POINTER_BYTES
);
taosArrayPush
(
pTasksList
,
&
pTaskList
);
return
pTaskList
;
}
// set the history task id
static
void
setHTasksId
(
SArray
*
pTaskList
,
const
SArray
*
pHTaskList
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTaskList
);
++
i
)
{
SStreamTask
*
pStreamTask
=
taosArrayGet
(
pTaskList
,
i
);
SStreamTask
*
pHTask
=
taosArrayGet
(
pHTaskList
,
i
);
pStreamTask
->
historyTaskId
.
taskId
=
pHTask
->
id
.
taskId
;
pStreamTask
->
historyTaskId
.
streamId
=
pHTask
->
id
.
streamId
;
}
}
static
int32_t
addSourceTasksForSingleLevelStream
(
SMnode
*
pMnode
,
const
SQueryPlan
*
pPlan
,
SStreamObj
*
pStream
,
bool
hasExtraSink
)
{
// create exec stream task, since only one level, the exec task is also the source task
SArray
*
pTaskList
=
addNewTaskList
(
pStream
->
tasks
);
SArray
*
pHTaskList
=
NULL
;
if
(
pStream
->
conf
.
fillHistory
)
{
pHTaskList
=
addNewTaskList
(
pStream
->
pHTasksList
);
}
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
planTotLevel
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
SNodeListNode
*
inner
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
if
(
LIST_LENGTH
(
inner
->
pNodeList
)
!=
1
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
bool
hasExtraSink
=
false
;
bool
externalTargetDB
=
strcmp
(
pStream
->
sourceDb
,
pStream
->
targetDb
)
!=
0
;
SDbObj
*
pDbObj
=
mndAcquireDb
(
pMnode
,
pStream
->
targetDb
);
if
(
pDbObj
==
NULL
)
{
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
inner
->
pNodeList
,
0
);
if
(
plan
->
subplanType
!=
SUBPLAN_TYPE_SCAN
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
bool
multiTarget
=
(
pDbObj
->
cfg
.
numOfVgroups
>
1
);
sdbRelease
(
pSdb
,
pDbObj
);
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
{
break
;
}
if
(
planTotLevel
==
2
||
externalTargetDB
||
multiTarget
||
pStream
->
fixedSinkVgId
)
{
SArray
*
taskOneLevel
=
taosArrayInit
(
0
,
POINTER_BYTES
);
taosArrayPush
(
pTasksList
,
&
taskOneLevel
);
if
(
!
mndVgroupInDb
(
pVgroup
,
pStream
->
sourceDbUid
))
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
// add extra sink
hasExtraSink
=
true
;
if
(
pStream
->
fixedSinkVgId
==
0
)
{
if
(
mndAddShuffleSinkTasksToStream
(
pMnode
,
taskOneLevel
,
pStream
)
<
0
)
{
// TODO free
return
-
1
;
}
}
else
{
if
(
mndAddSinkTaskToStream
(
pStream
,
taskOneLevel
,
pMnode
,
pStream
->
fixedSinkVgId
,
&
pStream
->
fixedSinkVg
)
<
0
)
{
// TODO free
return
-
1
;
}
// new stream task
SArray
**
pSinkTaskList
=
taosArrayGet
(
pStream
->
tasks
,
SINK_NODE_LEVEL
);
int32_t
code
=
addSourceStreamTask
(
pMnode
,
pVgroup
,
pTaskList
,
*
pSinkTaskList
,
pStream
,
plan
,
pStream
->
uid
,
pStream
->
conf
.
fillHistory
,
hasExtraSink
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
if
(
pStream
->
conf
.
fillHistory
)
{
SArray
**
pHSinkTaskList
=
taosArrayGet
(
pStream
->
pHTasksList
,
SINK_NODE_LEVEL
);
code
=
addSourceStreamTask
(
pMnode
,
pVgroup
,
pHTaskList
,
*
pHSinkTaskList
,
pStream
,
plan
,
pStream
->
hTaskUid
,
0
,
hasExtraSink
);
setHTasksId
(
pTaskList
,
pHTaskList
);
}
sdbRelease
(
pSdb
,
pVgroup
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
doAddSourceTask
(
SArray
*
pTaskList
,
int8_t
fillHistory
,
int64_t
uid
,
SStreamTask
*
pDownstreamTask
,
SMnode
*
pMnode
,
SSubplan
*
pPlan
,
SVgObj
*
pVgroup
)
{
SStreamTask
*
pTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__SOURCE
,
fillHistory
,
0
,
pTaskList
);
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pStream
->
totalLevel
=
planTotLevel
+
hasExtraSink
;
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo
(
pTask
,
pDownstreamTask
);
if
(
mndAssignStreamTaskToVgroup
(
pMnode
,
pTask
,
pPlan
,
pVgroup
)
<
0
)
{
return
-
1
;
}
if
(
planTotLevel
>
1
)
{
SStreamTask
*
pInnerTask
;
// inner level
{
SArray
*
taskInnerLevel
=
taosArrayInit
(
0
,
POINTER_BYTES
);
taosArrayPush
(
pTasksList
,
&
taskInnerLevel
);
SNodeListNode
*
inner
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
inner
->
pNodeList
,
0
);
if
(
plan
->
subplanType
!=
SUBPLAN_TYPE_MERGE
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
return
setEpToDownstreamTask
(
pTask
,
pDownstreamTask
);
}
pInnerTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__AGG
,
pStream
->
conf
.
fillHistory
,
pStream
->
conf
.
triggerParam
,
taskInnerLevel
);
if
(
pInnerTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
static
int32_t
doAddAggTask
(
uint64_t
uid
,
SArray
*
pTaskList
,
SArray
*
pSinkNodeList
,
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SStreamTask
**
pAggTask
)
{
*
pAggTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__AGG
,
pStream
->
conf
.
fillHistory
,
pStream
->
conf
.
triggerParam
,
pTaskList
);
if
(
*
pAggTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
// dispatch
SArray
*
pSinkNodeList
=
taosArrayGet
(
pTasksList
,
SINK_NODE_LEVEL
);
if
(
mndAddDispatcherForInnerTask
(
pMnode
,
pStream
,
pSinkNodeList
,
pInnerTask
)
<
0
)
{
return
-
1
;
}
// dispatch
if
(
mndAddDispatcherForInnerTask
(
pMnode
,
pStream
,
pSinkNodeList
,
*
pAggTask
)
<
0
)
{
return
-
1
;
}
if
(
tsDeployOnSnode
)
{
SSnodeObj
*
pSnode
=
mndSchedFetchOneSnode
(
pMnode
);
if
(
pSnode
==
NULL
)
{
SVgObj
*
pVgroup
=
mndSchedFetchOneVg
(
pMnode
,
pStream
->
sourceDbUid
);
if
(
mndAssignStreamTaskToVgroup
(
pMnode
,
pInnerTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
else
{
if
(
mndAssignTaskToSnode
(
pMnode
,
pInnerTask
,
plan
,
pSnode
)
<
0
)
{
sdbRelease
(
pSdb
,
pSnode
);
return
-
1
;
}
}
}
else
{
SVgObj
*
pVgroup
=
mndSchedFetchOneVg
(
pMnode
,
pStream
->
sourceDbUid
);
if
(
mndAssignStreamTaskToVgroup
(
pMnode
,
pInnerTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
return
0
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
}
static
int32_t
addAggTask
(
SStreamObj
*
pStream
,
SMnode
*
pMnode
,
SQueryPlan
*
pPlan
)
{
SArray
*
pAggTaskList
=
addNewTaskList
(
pStream
->
tasks
);
SSdb
*
pSdb
=
pMnode
->
pSdb
;
// source level
SArray
*
taskSourceLevel
=
taosArrayInit
(
0
,
POINTER_BYTES
);
taosArrayPush
(
pTasksList
,
&
taskSourceLevel
);
SNodeListNode
*
inner
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
inner
->
pNodeList
,
0
);
if
(
plan
->
subplanType
!=
SUBPLAN_TYPE_MERGE
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
SNodeListNode
*
inner
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
1
);
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
inner
->
pNodeList
,
0
);
if
(
plan
->
subplanType
!=
SUBPLAN_TYPE_SCAN
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
SStreamTask
*
pAggTask
=
NULL
;
SArray
*
pSinkNodeList
=
taosArrayGet
(
pStream
->
tasks
,
SINK_NODE_LEVEL
);
int32_t
code
=
doAddAggTask
(
pStream
->
uid
,
pAggTaskList
,
pSinkNodeList
,
pMnode
,
pStream
,
&
pAggTask
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
SVgObj
*
pVgroup
=
NULL
;
SSnodeObj
*
pSnode
=
NULL
;
if
(
tsDeployOnSnode
)
{
pSnode
=
mndSchedFetchOneSnode
(
pMnode
);
if
(
pSnode
==
NULL
)
{
pVgroup
=
mndSchedFetchOneVg
(
pMnode
,
pStream
->
sourceDbUid
);
}
}
else
{
pVgroup
=
mndSchedFetchOneVg
(
pMnode
,
pStream
->
sourceDbUid
);
}
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
{
break
;
}
if
(
pSnode
!=
NULL
)
{
code
=
mndAssignStreamTaskToSnode
(
pMnode
,
pAggTask
,
plan
,
pSnode
);
}
else
{
code
=
mndAssignStreamTaskToVgroup
(
pMnode
,
pAggTask
,
plan
,
pVgroup
);
}
if
(
!
mndVgroupInDb
(
pVgroup
,
pStream
->
sourceDbUid
))
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
if
(
pStream
->
conf
.
fillHistory
)
{
SArray
*
pHAggTaskList
=
addNewTaskList
(
pStream
->
pHTasksList
);
SStreamTask
*
pTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__SOURCE
,
pStream
->
conf
.
fillHistory
,
0
,
taskSourceLevel
);
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
SStreamTask
*
pHAggTask
=
NULL
;
code
=
doAddAggTask
(
pStream
->
uid
,
pAggTaskList
,
pSinkNodeList
,
pMnode
,
pStream
,
&
pHAggTask
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pSnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
pSnode
);
}
else
{
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
return
code
;
}
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo
(
pTask
,
pInnerTask
);
if
(
pSnode
!=
NULL
)
{
code
=
mndAssignStreamTaskToSnode
(
pMnode
,
pHAggTask
,
plan
,
pSnode
);
}
else
{
code
=
mndAssignStreamTaskToVgroup
(
pMnode
,
pHAggTask
,
plan
,
pVgroup
);
}
if
(
mndAssignStreamTaskToVgroup
(
pMnode
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
setHTasksId
(
pAggTaskList
,
pHAggTaskList
);
}
if
(
pSnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
pSnode
);
}
else
{
sdbRelease
(
pSdb
,
pVgroup
);
}
return
code
;
}
static
int32_t
addSourceTasksForMultiLevelStream
(
SMnode
*
pMnode
,
SQueryPlan
*
pPlan
,
SStreamObj
*
pStream
,
SStreamTask
*
pDownstreamTask
,
SStreamTask
*
pHDownstreamTask
)
{
SArray
*
pSourceTaskList
=
addNewTaskList
(
pStream
->
tasks
);
SArray
*
pHSourceTaskList
=
NULL
;
if
(
pStream
->
conf
.
fillHistory
)
{
pHSourceTaskList
=
addNewTaskList
(
pStream
->
pHTasksList
);
}
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SNodeListNode
*
inner
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
1
);
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
inner
->
pNodeList
,
0
);
if
(
plan
->
subplanType
!=
SUBPLAN_TYPE_SCAN
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
{
break
;
}
int32_t
code
=
appendToDownstream
(
pTask
,
pInnerTask
);
if
(
!
mndVgroupInDb
(
pVgroup
,
pStream
->
sourceDbUid
))
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
int32_t
code
=
doAddSourceTask
(
pSourceTaskList
,
pStream
->
conf
.
fillHistory
,
pStream
->
uid
,
pDownstreamTask
,
pMnode
,
plan
,
pVgroup
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbRelease
(
pSdb
,
pVgroup
);
terrno
=
code
;
return
-
1
;
}
if
(
pStream
->
conf
.
fillHistory
)
{
code
=
doAddSourceTask
(
pHSourceTaskList
,
0
,
pStream
->
hTaskUid
,
pHDownstreamTask
,
pMnode
,
plan
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
-
1
;
return
code
;
}
setHTasksId
(
pSourceTaskList
,
pHSourceTaskList
);
}
}
else
if
(
planTotLevel
==
1
)
{
// create exec stream task, since only one level, the exec task is also the source task
SArray
*
pTaskList
=
taosArrayInit
(
0
,
POINTER_BYTES
);
taosArrayPush
(
pTasksList
,
&
pTaskList
);
}
SNodeListNode
*
inner
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
if
(
LIST_LENGTH
(
inner
->
pNodeList
)
!=
1
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
addSinkTasks
(
SArray
*
pTasksList
,
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SArray
**
pCreatedTaskList
)
{
SArray
*
pSinkTaskList
=
addNewTaskList
(
pTasksList
);
if
(
pStream
->
fixedSinkVgId
==
0
)
{
if
(
mndAddShuffleSinkTasksToStream
(
pMnode
,
pSinkTaskList
,
pStream
)
<
0
)
{
// TODO free
return
-
1
;
}
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
inner
->
pNodeList
,
0
);
if
(
plan
->
subplanType
!=
SUBPLAN_TYPE_SCAN
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
}
else
{
if
(
mndAddSinkTaskToStream
(
pStream
,
pSinkTaskList
,
pMnode
,
pStream
->
fixedSinkVgId
,
&
pStream
->
fixedSinkVg
)
<
0
)
{
// TODO free
return
-
1
;
}
}
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
{
break
;
}
*
pCreatedTaskList
=
pSinkTaskList
;
return
TSDB_CODE_SUCCESS
;
}
if
(
!
mndVgroupInDb
(
pVgroup
,
pStream
->
sourceDbUid
))
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
static
int32_t
doScheduleStream
(
SStreamObj
*
pStream
,
SMnode
*
pMnode
,
SQueryPlan
*
pPlan
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfPlanLevel
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
// new stream task
SArray
*
pSinkNodeTaskList
=
taosArrayGet
(
pTasksList
,
SINK_NODE_LEVEL
);
int32_t
code
=
addSourceStreamTask
(
pMnode
,
pVgroup
,
pTaskList
,
pSinkNodeTaskList
,
pStream
,
plan
,
uid
,
TASK_LEVEL__SOURCE
,
pStream
->
conf
.
fillHistory
,
hasExtraSink
);
sdbRelease
(
pSdb
,
pVgroup
);
bool
hasExtraSink
=
false
;
bool
externalTargetDB
=
strcmp
(
pStream
->
sourceDb
,
pStream
->
targetDb
)
!=
0
;
SDbObj
*
pDbObj
=
mndAcquireDb
(
pMnode
,
pStream
->
targetDb
);
if
(
pDbObj
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
bool
multiTarget
=
(
pDbObj
->
cfg
.
numOfVgroups
>
1
);
sdbRelease
(
pSdb
,
pDbObj
);
pStream
->
tasks
=
taosArrayInit
(
numOfPlanLevel
+
1
,
POINTER_BYTES
);
pStream
->
pHTasksList
=
taosArrayInit
(
numOfPlanLevel
+
1
,
POINTER_BYTES
);
if
(
numOfPlanLevel
==
2
||
externalTargetDB
||
multiTarget
||
pStream
->
fixedSinkVgId
)
{
// add extra sink
hasExtraSink
=
true
;
SArray
*
pSinkTaskList
=
NULL
;
int32_t
code
=
addSinkTasks
(
pStream
->
tasks
,
pMnode
,
pStream
,
&
pSinkTaskList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// check for fill history
if
(
pStream
->
conf
.
fillHistory
)
{
SArray
*
pHSinkTaskList
=
NULL
;
code
=
addSinkTasks
(
pStream
->
pHTasksList
,
pMnode
,
pStream
,
&
pHSinkTaskList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
return
code
;
}
setHTasksId
(
pSinkTaskList
,
pHSinkTaskList
);
}
}
pStream
->
totalLevel
=
numOfPlanLevel
+
hasExtraSink
;
if
(
numOfPlanLevel
>
1
)
{
SStreamTask
*
pInnerTask
;
int32_t
code
=
addAggTask
(
pStream
,
pMnode
,
pPlan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// source level
return
addSourceTasksForMultiLevelStream
(
pMnode
,
pPlan
,
pStream
,
pInnerTask
,
NULL
);
}
else
if
(
numOfPlanLevel
==
1
)
{
return
addSourceTasksForSingleLevelStream
(
pMnode
,
pPlan
,
pStream
,
hasExtraSink
);
}
return
0
;
}
...
...
@@ -490,13 +619,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return
-
1
;
}
int32_t
code
=
doScheduleStream
(
pStream
->
uid
,
pStream
->
tasks
,
pStream
,
pMnode
,
pPlan
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
code
=
doScheduleStream
(
pStream
->
batchTaskUid
,
pStream
->
pBatchTask
,
pStream
,
pMnode
,
pPlan
);
}
int32_t
code
=
doScheduleStream
(
pStream
,
pMnode
,
pPlan
);
qDestroyQueryPlan
(
pPlan
);
return
code
;
}
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
be8fd9e4
...
...
@@ -428,17 +428,22 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
NULL
,
0
);
tEncodeStreamTask
(
&
encoder
,
pTask
);
int32_t
size
=
encoder
.
pos
;
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
size
;
tEncoderClear
(
&
encoder
);
void
*
buf
=
taosMemoryCalloc
(
1
,
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pTask
->
nodeId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncoderInit
(
&
encoder
,
abuf
,
size
);
tEncodeStreamTask
(
&
encoder
,
pTask
);
tEncoderClear
(
&
encoder
);
...
...
@@ -448,10 +453,12 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_STREAM_TASK_DEPLOY
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
buf
);
return
-
1
;
}
return
0
;
}
...
...
@@ -468,6 +475,25 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
}
}
}
// persistent stream task for history data
if
(
pStream
->
conf
.
fillHistory
)
{
level
=
taosArrayGetSize
(
pStream
->
pHTasksList
);
for
(
int32_t
i
=
0
;
i
<
level
;
i
++
)
{
SArray
*
pLevel
=
taosArrayGetP
(
pStream
->
pHTasksList
,
i
);
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
j
);
if
(
mndPersistTaskDeployReq
(
pTrans
,
pTask
)
<
0
)
{
return
-
1
;
}
}
}
}
return
0
;
}
...
...
@@ -475,11 +501,13 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if
(
mndPersistStreamTasks
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
return
-
1
;
}
SSdbRaw
*
pCommitRaw
=
mndStreamActionEncode
(
pStream
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
return
0
;
}
...
...
@@ -491,6 +519,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
mndTransDrop
(
pTrans
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
return
0
;
}
...
...
@@ -733,6 +762,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mError
(
"stream:%s, failed to create since %s"
,
createStreamReq
.
name
,
terrstr
());
goto
_OVER
;
}
mInfo
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
createStreamReq
.
name
);
mndTransSetDbName
(
pTrans
,
createStreamReq
.
sourceDB
,
streamObj
.
targetDb
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录