Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
20e1ebfe
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
20e1ebfe
编写于
3月 25, 2022
作者:
L
Liu Jicong
提交者:
GitHub
3月 25, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11003 from taosdata/feature/tq
refactor stream
上级
c92d4323
03b21623
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
272 addition
and
270 deletion
+272
-270
include/common/tmsg.h
include/common/tmsg.h
+1
-166
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+164
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+0
-103
source/dnode/mnode/impl/CMakeLists.txt
source/dnode/mnode/impl/CMakeLists.txt
+1
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/snode/CMakeLists.txt
source/dnode/snode/CMakeLists.txt
+1
-0
source/dnode/snode/inc/sndInt.h
source/dnode/snode/inc/sndInt.h
+1
-0
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+103
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
20e1ebfe
...
...
@@ -207,7 +207,7 @@ typedef struct {
// Submit message for one table
typedef
struct
SSubmitBlk
{
int64_t
uid
;
// table unique id
int64_t
suid
;
// stable id
int64_t
suid
;
// stable id
int32_t
padding
;
// TODO just for padding here
int32_t
sversion
;
// data schema version
int32_t
dataLen
;
// data part length, not including the SSubmitBlk head
...
...
@@ -2358,171 +2358,6 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
}
return
buf
;
}
enum
{
STREAM_TASK_STATUS__RUNNING
=
1
,
STREAM_TASK_STATUS__STOP
,
};
// pipe -> fetch/pipe queue
// merge -> merge queue
// write -> write queue
enum
{
TASK_SINK_MSG__SND_PIPE
=
1
,
TASK_SINK_MSG__SND_MERGE
,
TASK_SINK_MSG__VND_PIPE
,
TASK_SINK_MSG__VND_MERGE
,
TASK_SINK_MSG__VND_WRITE
,
};
typedef
struct
{
int32_t
nodeId
;
// 0 for snode
SEpSet
epSet
;
}
SStreamTaskEp
;
typedef
struct
{
void
*
inputHandle
;
void
*
executor
;
}
SStreamRunner
;
typedef
struct
{
int8_t
parallelizable
;
char
*
qmsg
;
// followings are not applicable to encoder and decoder
int8_t
numOfRunners
;
SStreamRunner
*
runners
;
}
STaskExec
;
typedef
struct
{
int8_t
reserved
;
}
STaskDispatcherInplace
;
typedef
struct
{
int32_t
nodeId
;
SEpSet
epSet
;
}
STaskDispatcherFixedEp
;
typedef
struct
{
int8_t
hashMethod
;
SArray
*
info
;
}
STaskDispatcherShuffle
;
typedef
struct
{
int8_t
reserved
;
// not applicable to encoder and decoder
SHashObj
*
pHash
;
// groupId to tbuid
}
STaskSinkTb
;
typedef
struct
{
int8_t
reserved
;
}
STaskSinkSma
;
typedef
struct
{
int8_t
reserved
;
}
STaskSinkFetch
;
typedef
struct
{
int8_t
reserved
;
}
STaskSinkShow
;
enum
{
TASK_SOURCE__SCAN
=
1
,
TASK_SOURCE__SINGLE
,
TASK_SOURCE__MULTI
,
};
enum
{
TASK_EXEC__NONE
=
1
,
TASK_EXEC__EXEC
,
};
enum
{
TASK_DISPATCH__NONE
=
1
,
TASK_DISPATCH__INPLACE
,
TASK_DISPATCH__FIXED
,
TASK_DISPATCH__SHUFFLE
,
};
enum
{
TASK_SINK__NONE
=
1
,
TASK_SINK__TABLE
,
TASK_SINK__SMA
,
TASK_SINK__FETCH
,
TASK_SINK__SHOW
,
};
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int8_t
status
;
int8_t
sourceType
;
int8_t
execType
;
int8_t
sinkType
;
int8_t
dispatchType
;
int16_t
dispatchMsgType
;
int32_t
downstreamTaskId
;
// source preprocess
// exec
STaskExec
exec
;
// local sink
union
{
STaskSinkTb
tbSink
;
STaskSinkSma
smaSink
;
STaskSinkFetch
fetchSink
;
STaskSinkShow
showSink
;
};
// dispatch
union
{
STaskDispatcherInplace
inplaceDispatcher
;
STaskDispatcherFixedEp
fixedEpDispatcher
;
STaskDispatcherShuffle
shuffleDispatcher
;
};
// state storage
}
SStreamTask
;
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
);
int32_t
tEncodeSStreamTask
(
SCoder
*
pEncoder
,
const
SStreamTask
*
pTask
);
int32_t
tDecodeSStreamTask
(
SCoder
*
pDecoder
,
SStreamTask
*
pTask
);
void
tFreeSStreamTask
(
SStreamTask
*
pTask
);
typedef
struct
{
// SMsgHead head;
SStreamTask
*
task
;
}
SStreamTaskDeployReq
;
typedef
struct
{
int32_t
reserved
;
}
SStreamTaskDeployRsp
;
typedef
struct
{
// SMsgHead head;
int64_t
streamId
;
int32_t
taskId
;
SArray
*
data
;
// SArray<SSDataBlock>
}
SStreamTaskExecReq
;
int32_t
tEncodeSStreamTaskExecReq
(
void
**
buf
,
const
SStreamTaskExecReq
*
pReq
);
void
*
tDecodeSStreamTaskExecReq
(
const
void
*
buf
,
SStreamTaskExecReq
*
pReq
);
void
tFreeSStreamTaskExecReq
(
SStreamTaskExecReq
*
pReq
);
typedef
struct
{
int32_t
reserved
;
}
SStreamTaskExecRsp
;
typedef
struct
{
// SMsgHead head;
int64_t
streamId
;
int64_t
version
;
SArray
*
res
;
// SArray<SSDataBlock>
}
SStreamSinkReq
;
#pragma pack(pop)
#ifdef __cplusplus
...
...
include/libs/stream/tstream.h
浏览文件 @
20e1ebfe
...
...
@@ -25,6 +25,170 @@ extern "C" {
#ifndef _TSTREAM_H_
#define _TSTREAM_H_
enum
{
STREAM_TASK_STATUS__RUNNING
=
1
,
STREAM_TASK_STATUS__STOP
,
};
// pipe -> fetch/pipe queue
// merge -> merge queue
// write -> write queue
enum
{
TASK_SINK_MSG__SND_PIPE
=
1
,
TASK_SINK_MSG__SND_MERGE
,
TASK_SINK_MSG__VND_PIPE
,
TASK_SINK_MSG__VND_MERGE
,
TASK_SINK_MSG__VND_WRITE
,
};
typedef
struct
{
int32_t
nodeId
;
// 0 for snode
SEpSet
epSet
;
}
SStreamTaskEp
;
typedef
struct
{
void
*
inputHandle
;
void
*
executor
;
}
SStreamRunner
;
typedef
struct
{
int8_t
parallelizable
;
char
*
qmsg
;
// followings are not applicable to encoder and decoder
int8_t
numOfRunners
;
SStreamRunner
*
runners
;
}
STaskExec
;
typedef
struct
{
int8_t
reserved
;
}
STaskDispatcherInplace
;
typedef
struct
{
int32_t
nodeId
;
SEpSet
epSet
;
}
STaskDispatcherFixedEp
;
typedef
struct
{
int8_t
hashMethod
;
SArray
*
info
;
}
STaskDispatcherShuffle
;
typedef
struct
{
int8_t
reserved
;
// not applicable to encoder and decoder
SHashObj
*
pHash
;
// groupId to tbuid
}
STaskSinkTb
;
typedef
struct
{
int8_t
reserved
;
}
STaskSinkSma
;
typedef
struct
{
int8_t
reserved
;
}
STaskSinkFetch
;
typedef
struct
{
int8_t
reserved
;
}
STaskSinkShow
;
enum
{
TASK_SOURCE__SCAN
=
1
,
TASK_SOURCE__SINGLE
,
TASK_SOURCE__MULTI
,
};
enum
{
TASK_EXEC__NONE
=
1
,
TASK_EXEC__EXEC
,
};
enum
{
TASK_DISPATCH__NONE
=
1
,
TASK_DISPATCH__INPLACE
,
TASK_DISPATCH__FIXED
,
TASK_DISPATCH__SHUFFLE
,
};
enum
{
TASK_SINK__NONE
=
1
,
TASK_SINK__TABLE
,
TASK_SINK__SMA
,
TASK_SINK__FETCH
,
TASK_SINK__SHOW
,
};
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int8_t
status
;
int8_t
sourceType
;
int8_t
execType
;
int8_t
sinkType
;
int8_t
dispatchType
;
int16_t
dispatchMsgType
;
int32_t
downstreamTaskId
;
// source preprocess
// exec
STaskExec
exec
;
// local sink
union
{
STaskSinkTb
tbSink
;
STaskSinkSma
smaSink
;
STaskSinkFetch
fetchSink
;
STaskSinkShow
showSink
;
};
// dispatch
union
{
STaskDispatcherInplace
inplaceDispatcher
;
STaskDispatcherFixedEp
fixedEpDispatcher
;
STaskDispatcherShuffle
shuffleDispatcher
;
};
// state storage
}
SStreamTask
;
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
);
int32_t
tEncodeSStreamTask
(
SCoder
*
pEncoder
,
const
SStreamTask
*
pTask
);
int32_t
tDecodeSStreamTask
(
SCoder
*
pDecoder
,
SStreamTask
*
pTask
);
void
tFreeSStreamTask
(
SStreamTask
*
pTask
);
typedef
struct
{
// SMsgHead head;
SStreamTask
*
task
;
}
SStreamTaskDeployReq
;
typedef
struct
{
int32_t
reserved
;
}
SStreamTaskDeployRsp
;
typedef
struct
{
// SMsgHead head;
int64_t
streamId
;
int32_t
taskId
;
SArray
*
data
;
// SArray<SSDataBlock>
}
SStreamTaskExecReq
;
int32_t
tEncodeSStreamTaskExecReq
(
void
**
buf
,
const
SStreamTaskExecReq
*
pReq
);
void
*
tDecodeSStreamTaskExecReq
(
const
void
*
buf
,
SStreamTaskExecReq
*
pReq
);
void
tFreeSStreamTaskExecReq
(
SStreamTaskExecReq
*
pReq
);
typedef
struct
{
int32_t
reserved
;
}
SStreamTaskExecRsp
;
typedef
struct
{
// SMsgHead head;
int64_t
streamId
;
int64_t
version
;
SArray
*
res
;
// SArray<SSDataBlock>
}
SStreamSinkReq
;
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
);
#ifdef __cplusplus
...
...
source/common/src/tmsg.c
浏览文件 @
20e1ebfe
...
...
@@ -3094,106 +3094,3 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
tfree
(
pReq
->
sql
);
tfree
(
pReq
->
ast
);
}
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
)
{
SStreamTask
*
pTask
=
(
SStreamTask
*
)
calloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
NULL
;
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
status
=
STREAM_TASK_STATUS__RUNNING
;
/*pTask->qmsg = NULL;*/
return
pTask
;
}
int32_t
tEncodeSStreamTask
(
SCoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if
(
tEncodeI64
(
pEncoder
,
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
execType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
downstreamTaskId
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
==
TASK_EXEC__EXEC
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
!=
TASK_SINK__NONE
)
{
// TODO: wrap
if
(
tEncodeI8
(
pEncoder
,
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
inplaceDispatcher
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
shuffleDispatcher
.
hashMethod
)
<
0
)
return
-
1
;
}
/*tEndEncode(pEncoder);*/
return
pEncoder
->
pos
;
}
int32_t
tDecodeSStreamTask
(
SCoder
*
pDecoder
,
SStreamTask
*
pTask
)
{
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
execType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
downstreamTaskId
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
==
TASK_EXEC__EXEC
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
!=
TASK_SINK__NONE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
inplaceDispatcher
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
hashMethod
)
<
0
)
return
-
1
;
}
/*tEndDecode(pDecoder);*/
return
0
;
}
void
tFreeSStreamTask
(
SStreamTask
*
pTask
)
{
// TODO
/*free(pTask->qmsg);*/
/*free(pTask->executor);*/
/*free(pTask);*/
}
#if 0
int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
/*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/
return pEncoder->size;
}
int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) {
return pEncoder->size;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) {
taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock);
}
#endif
source/dnode/mnode/impl/CMakeLists.txt
浏览文件 @
20e1ebfe
...
...
@@ -6,7 +6,7 @@ target_include_directories(
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_link_libraries
(
mnode scheduler sdb wal transport cjson sync monitor parser
mnode scheduler sdb wal transport cjson sync monitor
stream
parser
)
if
(
${
BUILD_TEST
}
)
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
20e1ebfe
...
...
@@ -26,6 +26,7 @@
#include "tlog.h"
#include "tmsg.h"
#include "trpc.h"
#include "tstream.h"
#include "ttimer.h"
#include "mnode.h"
...
...
source/dnode/snode/CMakeLists.txt
浏览文件 @
20e1ebfe
...
...
@@ -13,4 +13,5 @@ target_link_libraries(
PRIVATE common
PRIVATE util
PRIVATE qcom
PRIVATE stream
)
source/dnode/snode/inc/sndInt.h
浏览文件 @
20e1ebfe
...
...
@@ -22,6 +22,7 @@
#include "tmsg.h"
#include "tqueue.h"
#include "trpc.h"
#include "tstream.h"
#include "snode.h"
...
...
source/libs/stream/src/tstream.c
浏览文件 @
20e1ebfe
...
...
@@ -143,3 +143,106 @@ void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) {
}
void
tFreeSStreamTaskExecReq
(
SStreamTaskExecReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
data
);
}
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
)
{
SStreamTask
*
pTask
=
(
SStreamTask
*
)
calloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
NULL
;
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
status
=
STREAM_TASK_STATUS__RUNNING
;
/*pTask->qmsg = NULL;*/
return
pTask
;
}
int32_t
tEncodeSStreamTask
(
SCoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if
(
tEncodeI64
(
pEncoder
,
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
execType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
downstreamTaskId
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
==
TASK_EXEC__EXEC
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
!=
TASK_SINK__NONE
)
{
// TODO: wrap
if
(
tEncodeI8
(
pEncoder
,
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
inplaceDispatcher
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
shuffleDispatcher
.
hashMethod
)
<
0
)
return
-
1
;
}
/*tEndEncode(pEncoder);*/
return
pEncoder
->
pos
;
}
int32_t
tDecodeSStreamTask
(
SCoder
*
pDecoder
,
SStreamTask
*
pTask
)
{
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
execType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
downstreamTaskId
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
==
TASK_EXEC__EXEC
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
!=
TASK_SINK__NONE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
inplaceDispatcher
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
hashMethod
)
<
0
)
return
-
1
;
}
/*tEndDecode(pDecoder);*/
return
0
;
}
void
tFreeSStreamTask
(
SStreamTask
*
pTask
)
{
// TODO
/*free(pTask->qmsg);*/
/*free(pTask->executor);*/
/*free(pTask);*/
}
#if 0
int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
/*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/
return pEncoder->size;
}
int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) {
return pEncoder->size;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) {
taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock);
}
#endif
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录