Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
70cddceb
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
70cddceb
编写于
3月 21, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
assign task to vg
上级
d13bb57d
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
203 addition
and
124 deletion
+203
-124
include/common/tmsg.h
include/common/tmsg.h
+13
-3
source/client/src/tmq.c
source/client/src/tmq.c
+91
-2
source/common/src/tmsg.c
source/common/src/tmsg.c
+31
-8
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+1
-1
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+61
-104
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+2
-2
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+3
-3
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
70cddceb
...
...
@@ -1129,10 +1129,10 @@ typedef struct {
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
outputTbName
[
TSDB_TABLE_NAME_LEN
];
int8_t
igExists
;
char
*
sql
;
char
*
physicalPlan
;
char
*
logicalPlan
;
char
*
ast
;
}
SCMCreateStreamReq
;
typedef
struct
{
...
...
@@ -2273,13 +2273,23 @@ enum {
STREAM_TASK_STATUS__STOP
,
};
typedef
struct
{
void
*
inputHandle
;
void
**
executor
;
}
SStreamTaskParRunner
;
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
level
;
int8_t
status
;
int8_t
pipeEnd
;
int8_t
parallel
;
SEpSet
NextOpEp
;
char
*
qmsg
;
void
*
executor
;
// not applied to encoder and decoder
SStreamTaskParRunner
runner
;
// void* executor;
// void* stateStore;
// storage handle
}
SStreamTask
;
...
...
source/client/src/tmq.c
浏览文件 @
70cddceb
...
...
@@ -456,6 +456,94 @@ _return:
void
tmq_conf_set_offset_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
)
{
conf
->
commit_cb
=
cb
;
}
TAOS_RES
*
tmq_create_stream
(
TAOS
*
taos
,
const
char
*
streamName
,
const
char
*
tbName
,
const
char
*
sql
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQueryNode
=
NULL
;
char
*
astStr
=
NULL
;
int32_t
sqlLen
;
terrno
=
TSDB_CODE_SUCCESS
;
if
(
taos
==
NULL
||
streamName
==
NULL
||
sql
==
NULL
)
{
tscError
(
"invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s"
,
taos
,
streamName
,
sql
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
sqlLen
=
strlen
(
sql
);
if
(
strlen
(
streamName
)
>=
TSDB_TABLE_NAME_LEN
)
{
tscError
(
"stream name too long, max length:%d"
,
TSDB_TABLE_NAME_LEN
-
1
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
if
(
sqlLen
>
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
goto
_return
;
}
tscDebug
(
"start to create stream: %s"
,
streamName
);
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
false
,
&
pQueryNode
),
_return
);
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO
(
nodesNodeToString
(
pQueryNode
->
pRoot
,
false
,
&
astStr
,
NULL
),
_return
);
/*printf("%s\n", pStr);*/
SName
name
=
{.
acctId
=
pTscObj
->
acctId
,
.
type
=
TSDB_TABLE_NAME_T
};
strcpy
(
name
.
dbname
,
pRequest
->
pDb
);
strcpy
(
name
.
tname
,
streamName
);
SCMCreateStreamReq
req
=
{
.
igExists
=
1
,
.
ast
=
astStr
,
.
sql
=
(
char
*
)
sql
,
};
tNameExtractFullName
(
&
name
,
req
.
name
);
strcpy
(
req
.
outputTbName
,
tbName
);
int
tlen
=
tSerializeSCMCreateStreamReq
(
NULL
,
0
,
&
req
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
goto
_return
;
}
tSerializeSCMCreateStreamReq
(
buf
,
tlen
,
&
req
);
/*printf("formatted: %s\n", dagStr);*/
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
,
.
handle
=
NULL
,
};
pRequest
->
type
=
TDMT_MND_CREATE_STREAM
;
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
SEpSet
epSet
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
_return:
tfree
(
astStr
);
qDestroyQuery
(
pQueryNode
);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if
(
pRequest
!=
NULL
&&
terrno
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
terrno
;
}
return
pRequest
;
}
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
topicName
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
SRequestObj
*
pRequest
=
NULL
;
...
...
@@ -481,7 +569,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
goto
_return
;
}
tscDebug
(
"start to create topic
,
%s"
,
topicName
);
tscDebug
(
"start to create topic
:
%s"
,
topicName
);
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
true
,
&
pQueryNode
),
_return
);
...
...
@@ -498,7 +586,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
SCMCreateTopicReq
req
=
{
.
igExists
=
1
,
.
ast
=
(
char
*
)
astStr
,
.
ast
=
astStr
,
.
sql
=
(
char
*
)
sql
,
};
tNameExtractFullName
(
&
name
,
req
.
name
);
...
...
@@ -528,6 +616,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
_return:
tfree
(
astStr
);
qDestroyQuery
(
pQueryNode
);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
...
...
source/common/src/tmsg.c
浏览文件 @
70cddceb
...
...
@@ -2469,7 +2469,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq
int32_t
tDecodeSMqCMCommitOffsetReq
(
SCoder
*
decoder
,
SMqCMCommitOffsetReq
*
pReq
)
{
if
(
tStartDecode
(
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
decoder
,
&
pReq
->
num
)
<
0
)
return
-
1
;
TCODER_MALLOC
(
pReq
->
offsets
,
SMqOffset
*
,
pReq
->
num
*
sizeof
(
SMqOffset
),
decoder
);
TCODER_MALLOC
(
pReq
->
offsets
,
SMqOffset
*
,
pReq
->
num
*
sizeof
(
SMqOffset
),
decoder
);
if
(
pReq
->
offsets
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
num
;
i
++
)
{
tDecodeSMqOffset
(
decoder
,
&
pReq
->
offsets
[
i
]);
...
...
@@ -2653,15 +2653,22 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
}
int32_t
tSerializeSCMCreateStreamReq
(
void
*
buf
,
int32_t
bufLen
,
const
SCMCreateStreamReq
*
pReq
)
{
int32_t
sqlLen
=
0
;
int32_t
astLen
=
0
;
if
(
pReq
->
sql
!=
NULL
)
sqlLen
=
(
int32_t
)
strlen
(
pReq
->
sql
);
if
(
pReq
->
ast
!=
NULL
)
astLen
=
(
int32_t
)
strlen
(
pReq
->
ast
);
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
outputTbName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
physicalPlan
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
logicalPlan
)
<
0
)
return
-
1
;
if
(
sqlLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
if
(
astLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -2670,15 +2677,30 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
}
int32_t
tDeserializeSCMCreateStreamReq
(
void
*
buf
,
int32_t
bufLen
,
SCMCreateStreamReq
*
pReq
)
{
int32_t
sqlLen
=
0
;
int32_t
astLen
=
0
;
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
outputTbName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
&
decoder
,
&
pReq
->
sql
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
&
decoder
,
&
pReq
->
physicalPlan
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
&
decoder
,
&
pReq
->
logicalPlan
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
sqlLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
astLen
)
<
0
)
return
-
1
;
if
(
sqlLen
>
0
)
{
pReq
->
sql
=
calloc
(
1
,
sqlLen
+
1
);
if
(
pReq
->
sql
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
}
if
(
astLen
>
0
)
{
pReq
->
ast
=
calloc
(
1
,
astLen
+
1
);
if
(
pReq
->
ast
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
...
...
@@ -2687,8 +2709,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
void
tFreeSCMCreateStreamReq
(
SCMCreateStreamReq
*
pReq
)
{
tfree
(
pReq
->
sql
);
tfree
(
pReq
->
physicalPlan
);
tfree
(
pReq
->
logicalPlan
);
tfree
(
pReq
->
ast
);
}
int32_t
tEncodeSStreamTask
(
SCoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
...
...
@@ -2697,6 +2718,7 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
qmsg
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
...
...
@@ -2708,6 +2730,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
qmsg
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
70cddceb
...
...
@@ -28,7 +28,7 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void
mndReleaseVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
);
SSdbRaw
*
mndVgroupActionEncode
(
SVgObj
*
pVgroup
);
int32_t
mndAllocVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
);
SEpSet
mndGetVgroupEpset
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
);
SEpSet
mndGetVgroupEpset
(
SMnode
*
pMnode
,
const
SVgObj
*
pVgroup
);
int32_t
mndGetVnodesNum
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
void
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
70cddceb
...
...
@@ -31,6 +31,53 @@
#include "tname.h"
#include "tuuid.h"
int32_t
mndPersistTaskDeployReq
(
STrans
*
pTrans
,
SStreamTask
*
pTask
,
const
SEpSet
*
pEpSet
)
{
SCoder
encoder
;
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
NULL
,
0
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
encoder
.
pos
;
tCoderClear
(
&
encoder
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
streamTaskId
=
pTask
->
taskId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
abuf
,
tlen
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
tCoderClear
(
&
encoder
);
STransAction
action
=
{
0
};
memcpy
(
&
action
.
epSet
,
pEpSet
,
sizeof
(
SEpSet
));
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_SND_TASK_DEPLOY
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
return
-
1
;
}
return
0
;
}
int32_t
mndAssignTaskToVg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamTask
*
pTask
,
SSubplan
*
plan
,
const
SVgObj
*
pVgroup
)
{
int32_t
msgLen
;
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
if
(
qSubPlanToString
(
plan
,
&
pTask
->
qmsg
,
&
msgLen
)
<
0
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
plan
->
execNode
.
epSet
);
return
0
;
}
int32_t
mndAssignTaskToSnode
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamTask
*
pTask
,
SSubplan
*
plan
,
const
SSnodeObj
*
pSnode
)
{
return
0
;
}
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
...
...
@@ -44,7 +91,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t
totLevel
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
pStream
->
tasks
=
taosArrayInit
(
totLevel
,
sizeof
(
SArray
));
int32_t
msgLen
;
for
(
int32_t
level
=
0
;
level
<
totLevel
;
level
++
)
{
SArray
*
taskOneLevel
=
taosArrayInit
(
0
,
sizeof
(
SStreamTask
));
SNodeListNode
*
inner
=
nodesListGetNode
(
pPlan
->
pSubplans
,
level
);
...
...
@@ -67,43 +113,16 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// send to vnode
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
if
(
qSubPlanToString
(
plan
,
&
pTask
->
qmsg
,
&
msgLen
)
<
0
)
{
// TODO: set to
pTask
->
parallel
=
4
;
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
taosArrayPush
(
taskOneLevel
,
pTask
);
SCoder
encoder
;
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
NULL
,
0
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
encoder
.
pos
;
tCoderClear
(
&
encoder
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
streamTaskId
=
pTask
->
taskId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
abuf
,
tlen
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
tCoderClear
(
&
encoder
);
STransAction
action
=
{
0
};
action
.
epSet
=
plan
->
execNode
.
epSet
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_TASK_DEPLOY
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
return
-
1
;
}
}
}
else
if
(
plan
->
subplanType
==
SUBPLAN_TYPE_SCAN
)
{
// duplicatable
...
...
@@ -113,88 +132,26 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// if has snode, set to shared thread num in snode
parallel
=
SND_SHARED_THREAD_NUM
;
for
(
int32_t
i
=
0
;
i
<
parallel
;
i
++
)
{
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
// TODO:get snode id and ep
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
if
(
qSubPlanToString
(
plan
,
&
pTask
->
qmsg
,
&
msgLen
)
<
0
)
{
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
taosArrayPush
(
taskOneLevel
,
pTask
);
SCoder
encoder
;
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
NULL
,
0
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
encoder
.
pos
;
tCoderClear
(
&
encoder
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
streamTaskId
=
pTask
->
taskId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
abuf
,
tlen
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
tCoderClear
(
&
encoder
);
STransAction
action
=
{
0
};
action
.
epSet
=
plan
->
execNode
.
epSet
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_SND_TASK_DEPLOY
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
return
-
1
;
}
}
}
else
{
// not duplicatable
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
pTask
->
parallel
=
parallel
;
// TODO:get snode id and ep
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
if
(
qSubPlanToString
(
plan
,
&
pTask
->
qmsg
,
&
msgLen
)
<
0
)
{
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
taosArrayPush
(
taskOneLevel
,
pTask
);
}
else
{
// not duplicatable
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
SCoder
encoder
;
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
NULL
,
0
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
encoder
.
pos
;
tCoderClear
(
&
encoder
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
streamTaskId
=
pTask
->
taskId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
abuf
,
tlen
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
tCoderClear
(
&
encoder
);
STransAction
action
=
{
0
};
action
.
epSet
=
plan
->
execNode
.
epSet
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_SND_TASK_DEPLOY
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
// TODO: get snode
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
}
taosArrayPush
(
taskOneLevel
,
pTask
);
}
taosArrayPush
(
pStream
->
tasks
,
taskOneLevel
);
}
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
70cddceb
...
...
@@ -220,8 +220,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj
.
dbUid
=
pDb
->
uid
;
streamObj
.
version
=
1
;
streamObj
.
sql
=
pCreate
->
sql
;
streamObj
.
physicalPlan
=
pCreate
->
physicalPlan
;
streamObj
.
logicalPlan
=
pCreate
->
logicalPlan
;
streamObj
.
physicalPlan
=
""
;
streamObj
.
logicalPlan
=
""
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_CREATE_STREAM
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
70cddceb
...
...
@@ -433,12 +433,12 @@ ALLOC_VGROUP_OVER:
return
code
;
}
SEpSet
mndGetVgroupEpset
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
)
{
SEpSet
mndGetVgroupEpset
(
SMnode
*
pMnode
,
const
SVgObj
*
pVgroup
)
{
SEpSet
epset
=
{
0
};
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
v
];
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
const
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
v
];
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
continue
;
if
(
pVgid
->
role
==
TAOS_SYNC_STATE_LEADER
)
{
...
...
source/dnode/snode/src/snode.c
浏览文件 @
70cddceb
...
...
@@ -57,7 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) {
}
int32_t
sndMetaDeployTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
pTask
->
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
qmsg
,
NULL
);
pTask
->
runner
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
qmsg
,
NULL
);
return
taosHashPut
(
pMeta
->
pHash
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
void
*
));
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录