Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f2468139
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
f2468139
编写于
3月 28, 2022
作者:
L
Liu Jicong
提交者:
GitHub
3月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11060 from taosdata/feature/tq
stream add shuffle dispatcher
上级
8034ea7b
71fabef9
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
175 addition
and
89 deletion
+175
-89
include/common/tmsg.h
include/common/tmsg.h
+3
-1
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+3
-3
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-2
source/dnode/mnode/impl/inc/mndDb.h
source/dnode/mnode/impl/inc/mndDb.h
+1
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+44
-36
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+11
-0
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+111
-47
未找到文件。
include/common/tmsg.h
浏览文件 @
f2468139
...
...
@@ -565,8 +565,10 @@ typedef struct {
SArray
*
pVgroupInfos
;
// Array of SVgroupInfo
}
SUseDbRsp
;
int32_t
tSerializeSUseDbRsp
(
void
*
buf
,
int32_t
bufLen
,
SUseDbRsp
*
pRsp
);
int32_t
tSerializeSUseDbRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SUseDbRsp
*
pRsp
);
int32_t
tDeserializeSUseDbRsp
(
void
*
buf
,
int32_t
bufLen
,
SUseDbRsp
*
pRsp
);
int32_t
tSerializeSUseDbRspImp
(
SCoder
*
pEncoder
,
const
SUseDbRsp
*
pRsp
);
int32_t
tDeserializeSUseDbRspImp
(
SCoder
*
pDecoder
,
SUseDbRsp
*
pRsp
);
void
tFreeSUsedbRsp
(
SUseDbRsp
*
pRsp
);
typedef
struct
{
...
...
include/libs/stream/tstream.h
浏览文件 @
f2468139
...
...
@@ -72,8 +72,9 @@ typedef struct {
}
STaskDispatcherFixedEp
;
typedef
struct
{
int8_t
hashMethod
;
SArray
*
info
;
// int8_t hashMethod;
char
stbFullName
[
TSDB_TABLE_FNAME_LEN
];
SUseDbRsp
dbInfo
;
}
STaskDispatcherShuffle
;
typedef
struct
{
...
...
@@ -135,7 +136,6 @@ typedef struct {
int8_t
sinkType
;
int8_t
dispatchType
;
int16_t
dispatchMsgType
;
int32_t
downstreamTaskId
;
int32_t
nodeId
;
SEpSet
epSet
;
...
...
source/common/src/tmsg.c
浏览文件 @
f2468139
...
...
@@ -1829,7 +1829,7 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) {
return
0
;
}
static
int32_t
tSerializeSUseDbRspImp
(
SCoder
*
pEncoder
,
SUseDbRsp
*
pRsp
)
{
int32_t
tSerializeSUseDbRspImp
(
SCoder
*
pEncoder
,
const
SUseDbRsp
*
pRsp
)
{
if
(
tEncodeCStr
(
pEncoder
,
pRsp
->
db
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pRsp
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
vgVersion
)
<
0
)
return
-
1
;
...
...
@@ -1848,7 +1848,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
return
0
;
}
int32_t
tSerializeSUseDbRsp
(
void
*
buf
,
int32_t
bufLen
,
SUseDbRsp
*
pRsp
)
{
int32_t
tSerializeSUseDbRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SUseDbRsp
*
pRsp
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
source/dnode/mnode/impl/inc/mndDb.h
浏览文件 @
f2468139
...
...
@@ -28,6 +28,7 @@ SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
void
mndReleaseDb
(
SMnode
*
pMnode
,
SDbObj
*
pDb
);
int32_t
mndValidateDbInfo
(
SMnode
*
pMnode
,
SDbVgVersion
*
pDbs
,
int32_t
numOfDbs
,
void
**
ppRsp
,
int32_t
*
pRspLen
);
char
*
mnGetDbStr
(
char
*
src
);
int32_t
mndExtractDbInfo
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SUseDbRsp
*
pRsp
,
const
SUseDbReq
*
pReq
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
f2468139
...
...
@@ -955,7 +955,6 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
sdbCancelFetch
(
pSdb
,
pIter
);
}
static
void
mndBuildDBVgroupInfo
(
SDbObj
*
pDb
,
SMnode
*
pMnode
,
SArray
*
pVgList
)
{
int32_t
vindex
=
0
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
@@ -991,7 +990,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
}
sdbRelease
(
pSdb
,
pVgroup
);
if
(
pDb
&&
(
vindex
>=
pDb
->
cfg
.
numOfVgroups
))
{
break
;
}
...
...
@@ -1000,6 +999,28 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
sdbCancelFetch
(
pSdb
,
pIter
);
}
int32_t
mndExtractDbInfo
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SUseDbRsp
*
pRsp
,
const
SUseDbReq
*
pReq
)
{
pRsp
->
pVgroupInfos
=
taosArrayInit
(
pDb
->
cfg
.
numOfVgroups
,
sizeof
(
SVgroupInfo
));
if
(
pRsp
->
pVgroupInfos
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
int32_t
numOfTable
=
0
;
mndGetDBTableNum
(
pDb
,
pMnode
,
&
numOfTable
);
if
(
pReq
==
NULL
||
pReq
->
vgVersion
<
pDb
->
vgVersion
||
pReq
->
dbId
!=
pDb
->
uid
||
numOfTable
!=
pReq
->
numOfTable
)
{
mndBuildDBVgroupInfo
(
pDb
,
pMnode
,
pRsp
->
pVgroupInfos
);
}
memcpy
(
pRsp
->
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
pRsp
->
uid
=
pDb
->
uid
;
pRsp
->
vgVersion
=
pDb
->
vgVersion
;
pRsp
->
vgNum
=
taosArrayGetSize
(
pRsp
->
pVgroupInfos
);
pRsp
->
hashMethod
=
pDb
->
hashMethod
;
return
0
;
}
static
int32_t
mndProcessUseDbReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
code
=
-
1
;
...
...
@@ -1023,10 +1044,10 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
USE_DB_OVER
;
}
mndBuildDBVgroupInfo
(
NULL
,
pMnode
,
usedbRsp
.
pVgroupInfos
);
usedbRsp
.
vgVersion
=
vgVersion
++
;
if
(
taosArrayGetSize
(
usedbRsp
.
pVgroupInfos
)
<=
0
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
}
...
...
@@ -1034,7 +1055,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
usedbRsp
.
vgVersion
=
usedbReq
.
vgVersion
;
code
=
0
;
}
usedbRsp
.
vgNum
=
taosArrayGetSize
(
usedbRsp
.
pVgroupInfos
);
usedbRsp
.
vgNum
=
taosArrayGetSize
(
usedbRsp
.
pVgroupInfos
);
// no jump, need to construct rsp
}
else
{
...
...
@@ -1057,24 +1078,10 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
goto
USE_DB_OVER
;
}
usedbRsp
.
pVgroupInfos
=
taosArrayInit
(
pDb
->
cfg
.
numOfVgroups
,
sizeof
(
SVgroupInfo
));
if
(
usedbRsp
.
pVgroupInfos
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
if
(
mndExtractDbInfo
(
pMnode
,
pDb
,
&
usedbRsp
,
&
usedbReq
)
<
0
)
{
goto
USE_DB_OVER
;
}
int32_t
numOfTable
=
0
;
mndGetDBTableNum
(
pDb
,
pMnode
,
&
numOfTable
);
if
(
usedbReq
.
vgVersion
<
pDb
->
vgVersion
||
usedbReq
.
dbId
!=
pDb
->
uid
||
numOfTable
!=
usedbReq
.
numOfTable
)
{
mndBuildDBVgroupInfo
(
pDb
,
pMnode
,
usedbRsp
.
pVgroupInfos
);
}
memcpy
(
usedbRsp
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
usedbRsp
.
uid
=
pDb
->
uid
;
usedbRsp
.
vgVersion
=
pDb
->
vgVersion
;
usedbRsp
.
vgNum
=
taosArrayGetSize
(
usedbRsp
.
pVgroupInfos
);
usedbRsp
.
hashMethod
=
pDb
->
hashMethod
;
code
=
0
;
}
}
...
...
@@ -1138,7 +1145,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
mndReleaseDb
(
pMnode
,
pDb
);
continue
;
}
usedbRsp
.
pVgroupInfos
=
taosArrayInit
(
pDb
->
cfg
.
numOfVgroups
,
sizeof
(
SVgroupInfo
));
if
(
usedbRsp
.
pVgroupInfos
==
NULL
)
{
mndReleaseDb
(
pMnode
,
pDb
);
...
...
@@ -1364,11 +1371,11 @@ static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMet
pSchema
[
cols
].
bytes
=
pShow
->
bytes
[
cols
];
cols
++
;
// pShow->bytes[cols] = 1;
// pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
// strcpy(pSchema[cols].name, "update");
// pSchema[cols].bytes = pShow->bytes[cols];
// cols++;
// pShow->bytes[cols] = 1;
// pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
// strcpy(pSchema[cols].name, "update");
// pSchema[cols].bytes = pShow->bytes[cols];
// cols++;
pMeta
->
numOfColumns
=
cols
;
pShow
->
numOfColumns
=
cols
;
...
...
@@ -1396,14 +1403,15 @@ char *mnGetDbStr(char *src) {
return
pos
;
}
static
char
*
getDataPosition
(
char
*
pData
,
SShowObj
*
pShow
,
int32_t
cols
,
int32_t
rows
,
int32_t
capacityOfRow
)
{
static
char
*
getDataPosition
(
char
*
pData
,
SShowObj
*
pShow
,
int32_t
cols
,
int32_t
rows
,
int32_t
capacityOfRow
)
{
return
pData
+
pShow
->
offset
[
cols
]
*
capacityOfRow
+
pShow
->
bytes
[
cols
]
*
rows
;
}
static
void
dumpDbInfoToPayload
(
char
*
data
,
SDbObj
*
pDb
,
SShowObj
*
pShow
,
int32_t
rows
,
int32_t
rowCapacity
,
int64_t
numOfTables
)
{
static
void
dumpDbInfoToPayload
(
char
*
data
,
SDbObj
*
pDb
,
SShowObj
*
pShow
,
int32_t
rows
,
int32_t
rowCapacity
,
int64_t
numOfTables
)
{
int32_t
cols
=
0
;
char
*
pWrite
=
getDataPosition
(
data
,
pShow
,
cols
,
rows
,
rowCapacity
);
char
*
pWrite
=
getDataPosition
(
data
,
pShow
,
cols
,
rows
,
rowCapacity
);
char
*
name
=
mnGetDbStr
(
pDb
->
name
);
if
(
name
!=
NULL
)
{
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
name
,
pShow
->
bytes
[
cols
]);
...
...
@@ -1497,20 +1505,20 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_
STR_WITH_SIZE_TO_VARSTR
(
pWrite
,
prec
,
2
);
cols
++
;
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
}
static
void
setInformationSchemaDbCfg
(
SDbObj
*
pDbObj
)
{
static
void
setInformationSchemaDbCfg
(
SDbObj
*
pDbObj
)
{
ASSERT
(
pDbObj
!=
NULL
);
strncpy
(
pDbObj
->
name
,
TSDB_INFORMATION_SCHEMA_DB
,
tListLen
(
pDbObj
->
name
));
pDbObj
->
createdTime
=
0
;
pDbObj
->
createdTime
=
0
;
pDbObj
->
cfg
.
numOfVgroups
=
0
;
pDbObj
->
cfg
.
quorum
=
1
;
pDbObj
->
cfg
.
quorum
=
1
;
pDbObj
->
cfg
.
replications
=
1
;
pDbObj
->
cfg
.
update
=
1
;
pDbObj
->
cfg
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
pDbObj
->
cfg
.
update
=
1
;
pDbObj
->
cfg
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
}
static
int32_t
mndRetrieveDbs
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rowsCapacity
)
{
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
f2468139
...
...
@@ -222,8 +222,19 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
/*pTask->sinkType = TASK_SINK__NONE;*/
// dispatch part
pTask
->
dispatchType
=
TASK_DISPATCH__NONE
;
#if 0
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pSdb, pDb);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pDb);
#endif
// exec part
pTask
->
execType
=
TASK_EXEC__MERGE
;
...
...
source/libs/stream/src/tstream.c
浏览文件 @
f2468139
...
...
@@ -16,12 +16,89 @@
#include "tstream.h"
#include "executor.h"
static
int32_t
streamBuildDispatchMsg
(
SStreamTask
*
pTask
,
SArray
*
data
,
SRpcMsg
*
pMsg
,
SEpSet
**
ppEpSet
)
{
SStreamTaskExecReq
req
=
{
.
streamId
=
pTask
->
streamId
,
.
data
=
data
,
};
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
tEncodeSStreamTaskExecReq
(
NULL
,
&
req
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
((
SMsgHead
*
)
buf
)
->
vgId
=
0
;
req
.
taskId
=
pTask
->
inplaceDispatcher
.
taskId
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pTask
->
fixedEpDispatcher
.
nodeId
);
*
ppEpSet
=
&
pTask
->
fixedEpDispatcher
.
epSet
;
req
.
taskId
=
pTask
->
fixedEpDispatcher
.
taskId
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
int32_t
nodeId
=
0
;
// TODO fix tbname issue
char
ctbName
[
TSDB_TABLE_FNAME_LEN
+
22
];
// all groupId must be the same in an array
SSDataBlock
*
pBlock
=
taosArrayGet
(
data
,
0
);
sprintf
(
ctbName
,
"%s:%ld"
,
pTask
->
shuffleDispatcher
.
stbFullName
,
pBlock
->
info
.
groupId
);
// TODO: get hash function by hashMethod
// get groupId, compute hash value
uint32_t
hashValue
=
MurmurHash3_32
(
ctbName
,
strlen
(
ctbName
));
//
// get node
// TODO: optimize search process
SArray
*
vgInfo
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
int32_t
sz
=
taosArrayGetSize
(
vgInfo
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
vgInfo
,
i
);
if
(
hashValue
>=
pVgInfo
->
hashBegin
&&
hashValue
<=
pVgInfo
->
hashEnd
)
{
nodeId
=
pVgInfo
->
vgId
;
*
ppEpSet
=
&
pVgInfo
->
epSet
;
break
;
}
}
ASSERT
(
nodeId
!=
0
);
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
nodeId
);
}
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSStreamTaskExecReq
(
&
abuf
,
&
req
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
pMsg
->
msgType
=
pTask
->
dispatchMsgType
;
return
0
;
}
static
int32_t
streamShuffleDispatch
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SHashObj
*
data
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
data
,
pIter
);
if
(
pIter
==
NULL
)
return
0
;
SArray
*
pData
=
(
SArray
*
)
pIter
;
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
;
if
(
streamBuildDispatchMsg
(
pTask
,
pData
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tmsgSendReq
(
pMsgCb
,
pEpSet
,
&
dispatchMsg
);
}
return
0
;
}
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
)
{
SArray
*
pRes
=
NULL
;
// source
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
&&
pTask
->
sourceType
!=
TASK_SOURCE__SCAN
)
return
0
;
// exec
// TODO: for shuffle dispatcher, merge data by groupId
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
ASSERT
(
workId
<
pTask
->
exec
.
numOfRunners
);
void
*
exec
=
pTask
->
exec
.
runners
[
workId
].
executor
;
...
...
@@ -83,28 +160,13 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
}
// dispatch
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SStreamTaskExecReq
req
=
{
.
streamId
=
pTask
->
streamId
,
.
taskId
=
pTask
->
taskId
,
.
data
=
pRes
,
};
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
tEncodeSStreamTaskExecReq
(
NULL
,
&
req
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildDispatchMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSStreamTaskExecReq
(
&
abuf
,
&
req
);
SRpcMsg
dispatchMsg
=
{
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
,
.
msgType
=
pTask
->
dispatchMsgType
,
};
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_PIPE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_PIPE_EXEC
)
{
...
...
@@ -120,36 +182,38 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SStreamTaskExecReq
req
=
{
.
streamId
=
pTask
->
streamId
,
.
taskId
=
pTask
->
fixedEpDispatcher
.
taskId
,
.
data
=
pRes
,
};
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
tEncodeSStreamTaskExecReq
(
NULL
,
&
req
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildDispatchMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pTask
->
fixedEpDispatcher
.
nodeId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSStreamTaskExecReq
(
&
abuf
,
&
req
);
SRpcMsg
dispatchMsg
=
{
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
,
.
msgType
=
pTask
->
dispatchMsgType
,
};
SEpSet
*
pEpSet
=
&
pTask
->
fixedEpDispatcher
.
epSet
;
tmsgSendReq
(
pMsgCb
,
pEpSet
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
// TODO
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pArray
==
NULL
)
{
return
-
1
;
}
taosHashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
));
}
taosArrayPush
(
pArray
,
pDataBlock
);
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
...
...
@@ -196,7 +260,6 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
downstreamTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
...
...
@@ -225,7 +288,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
shuffleDispatcher
.
hashMethod
)
<
0
)
return
-
1
;
if
(
tSerializeSUseDbRspImp
(
pEncoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
/*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
}
/*tEndEncode(pEncoder);*/
...
...
@@ -242,7 +306,6 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
downstreamTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
...
...
@@ -271,7 +334,8 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
hashMethod
)
<
0
)
return
-
1
;
/*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
if
(
tDeserializeSUseDbRspImp
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
}
/*tEndDecode(pDecoder);*/
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录