Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
460ae120
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
460ae120
编写于
3月 28, 2022
作者:
L
Liu Jicong
提交者:
GitHub
3月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11049 from taosdata/feature/tq
Feature/tq
上级
47fe8bd9
a3b4de66
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
65 addition
and
32 deletion
+65
-32
include/common/tmsgdef.h
include/common/tmsgdef.h
+0
-1
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+7
-2
source/client/src/tmq.c
source/client/src/tmq.c
+1
-1
source/dnode/mgmt/vnode/src/vmMsg.c
source/dnode/mgmt/vnode/src/vmMsg.c
+0
-1
source/dnode/mnode/impl/inc/mndScheduler.h
source/dnode/mnode/impl/inc/mndScheduler.h
+1
-1
source/dnode/mnode/impl/inc/mndStream.h
source/dnode/mnode/impl/inc/mndStream.h
+1
-1
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+9
-8
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+5
-1
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+4
-3
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+3
-3
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+4
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-4
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+3
-2
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+5
-0
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+19
-3
未找到文件。
include/common/tmsgdef.h
浏览文件 @
460ae120
...
...
@@ -192,7 +192,6 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_DEPLOY
,
"vnode-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_EXEC
,
"vnode-task-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_PIPE_EXEC
,
"vnode-task-pipe-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_MERGE_EXEC
,
"vnode-task-merge-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_WRITE_EXEC
,
"vnode-task-write-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
...
...
include/libs/stream/tstream.h
浏览文件 @
460ae120
...
...
@@ -82,8 +82,12 @@ typedef struct {
SHashObj
*
pHash
;
// groupId to tbuid
}
STaskSinkTb
;
typedef
void
FSmaHandle
(
void
*
vnode
,
int64_t
smaId
,
const
SArray
*
data
);
typedef
struct
{
int8_t
reserved
;
int64_t
smaId
;
// following are not applicable to encoder and decoder
FSmaHandle
*
smaHandle
;
}
STaskSinkSma
;
typedef
struct
{
...
...
@@ -156,7 +160,8 @@ typedef struct {
STaskDispatcherShuffle
shuffleDispatcher
;
};
// state storage
// application storage
void
*
ahandle
;
}
SStreamTask
;
...
...
source/client/src/tmq.c
浏览文件 @
460ae120
...
...
@@ -877,7 +877,7 @@ WRITE_QUEUE_FAIL:
}
bool
tmqUpdateEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
SMqCMGetSubEpRsp
*
pRsp
)
{
printf
(
"call update ep %d
\n
"
,
epoch
);
/*printf("call update ep %d\n", epoch);*/
bool
set
=
false
;
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
topics
);
SArray
*
newTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
...
...
source/dnode/mgmt/vnode/src/vmMsg.c
浏览文件 @
460ae120
...
...
@@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CONSUME
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY_HEARTBEAT
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_EXEC
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_PIPE_EXEC
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_MERGE_EXEC
,
(
NodeMsgFp
)
vmProcessMergeMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_STREAM_TRIGGER
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
...
...
source/dnode/mnode/impl/inc/mndScheduler.h
浏览文件 @
460ae120
...
...
@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t
mndSchedInitSubEp
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
);
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
);
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
,
int64_t
smaId
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndStream.h
浏览文件 @
460ae120
...
...
@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw
*
mndStreamActionEncode
(
SStreamObj
*
pStream
);
SSdbRow
*
mndStreamActionDecode
(
SSdbRaw
*
pRaw
);
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
);
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
,
int64_t
smaId
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
460ae120
...
...
@@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeI32
(
pEncoder
,
sz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pArray
=
taosArrayGet
(
pObj
->
tasks
,
i
);
SArray
*
pArray
=
taosArrayGet
P
(
pObj
->
tasks
,
i
);
int32_t
innerSz
=
taosArrayGetSize
(
pArray
);
if
(
tEncodeI32
(
pEncoder
,
innerSz
)
<
0
)
return
-
1
;
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGet
(
pArray
,
j
);
SStreamTask
*
pTask
=
taosArrayGet
P
(
pArray
,
j
);
if
(
tEncodeSStreamTask
(
pEncoder
,
pTask
)
<
0
)
return
-
1
;
}
}
...
...
@@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t
sz
;
if
(
tDecodeI32
(
pDecoder
,
&
sz
)
<
0
)
return
-
1
;
if
(
sz
!=
0
)
{
pObj
->
tasks
=
taosArrayInit
(
sz
,
sizeof
(
SArray
));
pObj
->
tasks
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
innerSz
;
if
(
tDecodeI32
(
pDecoder
,
&
innerSz
)
<
0
)
return
-
1
;
SArray
*
pArray
=
taosArrayInit
(
innerSz
,
sizeof
(
SStreamTask
));
SArray
*
pArray
=
taosArrayInit
(
innerSz
,
sizeof
(
void
*
));
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
task
;
if
(
tDecodeSStreamTask
(
pDecoder
,
&
task
)
<
0
)
return
-
1
;
taosArrayPush
(
pArray
,
&
task
);
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
return
-
1
;
if
(
tDecodeSStreamTask
(
pDecoder
,
pTask
)
<
0
)
return
-
1
;
taosArrayPush
(
pArray
,
&
pTask
);
}
taosArrayPush
(
pObj
->
tasks
,
pArray
);
taosArrayPush
(
pObj
->
tasks
,
&
pArray
);
}
}
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
460ae120
...
...
@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return
pVgroup
;
}
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
)
{
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
,
int64_t
smaId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SQueryPlan
*
pPlan
=
qStringToQueryPlan
(
pStream
->
physicalPlan
);
if
(
pPlan
==
NULL
)
{
...
...
@@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// only for inplace
pTask
->
sinkType
=
TASK_SINK__SHOW
;
pTask
->
showSink
.
reserved
=
0
;
if
(
smaId
!=
-
1
)
{
pTask
->
sinkType
=
TASK_SINK__SMA
;
pTask
->
smaSink
.
smaId
=
smaId
;
}
}
else
{
pTask
->
sinkType
=
TASK_SINK__NONE
;
}
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
460ae120
...
...
@@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {}
static
SSdbRaw
*
mndSmaActionEncode
(
SSmaObj
*
pSma
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
size
=
sizeof
(
SSmaObj
)
+
pSma
->
exprLen
+
pSma
->
tagsFilterLen
+
pSma
->
sqlLen
+
pSma
->
astLen
+
TSDB_SMA_RESERVE_SIZE
;
int32_t
size
=
sizeof
(
SSmaObj
)
+
pSma
->
exprLen
+
pSma
->
tagsFilterLen
+
pSma
->
sqlLen
+
pSma
->
astLen
+
TSDB_SMA_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_SMA
,
TSDB_SMA_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
_OVER
;
...
...
@@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if
(
mndSetCreateSmaRedoLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaCommitLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
,
smaObj
.
uid
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
...
...
@@ -491,7 +492,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
mError
(
"sma:%s, failed to create since stb:%s not exist"
,
createReq
.
name
,
createReq
.
stb
);
goto
_OVER
;
}
pStream
=
mndAcquireStream
(
pMnode
,
createReq
.
name
);
if
(
pStream
!=
NULL
)
{
mError
(
"sma:%s, failed to create since stream:%s already exist"
,
createReq
.
name
,
createReq
.
name
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
460ae120
...
...
@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
return
code
;
}
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
)
{
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
,
int64_t
smaId
)
{
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
ast
,
&
pAst
)
<
0
)
{
...
...
@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return
-
1
;
}
if
(
mndScheduleStream
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
if
(
mndScheduleStream
(
pMnode
,
pTrans
,
pStream
,
smaId
)
<
0
)
{
mError
(
"stream:%ld, schedule stream since %s"
,
pStream
->
uid
,
terrstr
());
return
-
1
;
}
...
...
@@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
}
mDebug
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
pCreate
->
name
);
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
)
!=
0
)
{
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
,
-
1
)
!=
0
)
{
mError
(
"trans:%d, failed to add stream since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
460ae120
...
...
@@ -198,10 +198,13 @@ int tqCommit(STQ*);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
SRpcMsg
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
);
// sma
void
smaHandleRes
(
SVnode
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
460ae120
...
...
@@ -476,6 +476,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if
(
tqExpandTask
(
pTq
,
pTask
,
4
)
<
0
)
{
ASSERT
(
0
);
}
pTask
->
ahandle
=
pTq
->
pVnode
;
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
...
...
@@ -497,11 +498,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
return
0
;
}
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
SRpcMsg
*
msg
)
{
char
*
msgstr
=
POINTER_SHIFT
(
msg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTaskExecReq
req
;
tDecodeSStreamTaskExecReq
(
msg
str
,
&
req
);
tDecodeSStreamTaskExecReq
(
msg
,
&
req
);
int32_t
taskId
=
req
.
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
460ae120
...
...
@@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in fetch queue is processing"
);
char
*
msgstr
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_FETCH
:
return
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
...
...
@@ -65,10 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return
vnodeGetTableMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_VND_TASK_EXEC
:
case
TDMT_VND_TASK_PIPE_EXEC
:
case
TDMT_VND_TASK_MERGE_EXEC
:
return
tqProcessTaskExec
(
pVnode
->
pTq
,
pMsg
);
return
tqProcessTaskExec
(
pVnode
->
pTq
,
msgstr
,
msgLen
);
case
TDMT_VND_STREAM_TRIGGER
:
return
tqProcessStreamTrigger
(
pVnode
->
pTq
,
pMsg
->
pCont
,
pMsg
->
contLen
);
case
TDMT_VND_QUERY_HEARTBEAT
:
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
460ae120
...
...
@@ -178,6 +178,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
case
TDMT_VND_TASK_WRITE_EXEC
:
{
if
(
tqProcessTaskExec
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
#if 0
SSmaCfg vCreateSmaReq = {0};
...
...
source/libs/stream/src/tstream.c
浏览文件 @
460ae120
...
...
@@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
//
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaHandle
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
//
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
//
...
...
@@ -205,9 +206,16 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
!=
TASK_SINK__NONE
)
{
// TODO: wrap
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tEncodeI64
(
pEncoder
,
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
fetchSink
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SHOW
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
showSink
.
reserved
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
...
...
@@ -244,8 +252,16 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
!=
TASK_SINK__NON
E
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__TABL
E
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
fetchSink
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SHOW
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
showSink
.
reserved
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录