Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9322f803
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9322f803
编写于
8月 11, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(stream): register the rsp callback.
上级
1e636a2e
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
21 addition
and
24 deletion
+21
-24
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+0
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+20
-22
未找到文件。
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
9322f803
...
...
@@ -203,6 +203,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_PAUSE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RESUME_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_STREAM_TASK_UPDATE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
9322f803
...
...
@@ -87,8 +87,6 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_STREAM
,
mndProcessDropStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_NODECHECK_TIMER
,
mndProcessNodeCheckReq
);
/*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_DEPLOY_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_DROP_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_PAUSE_RSP
,
mndTransProcessRsp
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
9322f803
...
...
@@ -680,7 +680,6 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
SRpcMsg
rsp
=
{.
info
=
pMsg
->
info
,
.
pCont
=
buf
,
.
contLen
=
len
,
.
code
=
0
};
tmsgSendRsp
(
&
rsp
);
return
0
;
}
...
...
@@ -1663,6 +1662,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
return
0
;
}
// todo refactor.
int32_t
vnodeEnqueueStreamMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
STQ
*
pTq
=
pVnode
->
pTq
;
SMsgHead
*
msgStr
=
pMsg
->
pCont
;
...
...
@@ -1830,26 +1830,25 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
char
*
msg
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
len
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
int32_t
code
=
0
;
SRpcMsg
rsp
=
{.
info
=
pMsg
->
info
,
.
code
=
TSDB_CODE_SUCCESS
};
SStreamTaskNodeUpdateMsg
req
=
{
0
};
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
msg
,
len
);
if
(
tDecodeStreamTaskUpdateMsg
(
&
decoder
,
&
req
)
<
0
)
{
code
=
TSDB_CODE_MSG_DECODE_ERROR
;
tDecoderClear
(
&
decoder
);
tqError
(
"vgId:%d failed to decode task update msg, code:%s"
,
vgId
,
tstrerror
(
code
));
return
code
;
rsp
.
code
=
TSDB_CODE_MSG_DECODE_ERROR
;
tqError
(
"vgId:%d failed to decode task update msg, code:%s"
,
vgId
,
tstrerror
(
rsp
.
code
));
goto
_end
;
}
tDecoderClear
(
&
decoder
);
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pMeta
,
req
.
streamId
,
req
.
taskId
);
if
(
pTask
==
NULL
)
{
tqError
(
"vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already"
,
pMeta
->
vgId
,
req
.
taskId
);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return
TSDB_CODE_SUCCESS
;
rsp
.
code
=
TSDB_CODE_SUCCESS
;
goto
_end
;
}
tqDebug
(
"s-task:%s receive task nodeEp update msg from mnode"
,
pTask
->
id
.
idStr
);
...
...
@@ -1858,27 +1857,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask
*
pHistoryTask
=
NULL
;
if
(
pTask
->
historyTaskId
.
taskId
!=
0
)
{
pHistoryTask
=
streamMetaAcquireTask
(
pMeta
,
pTask
->
historyTaskId
.
streamId
,
pTask
->
historyTaskId
.
taskId
);
if
(
pHistoryTask
==
NULL
)
{
if
(
pHistoryTask
!=
NULL
)
{
tqDebug
(
"s-task:%s fill-history task handle task update along with related stream task"
,
pHistoryTask
->
id
.
idStr
);
streamTaskUpdateEpsetInfo
(
pHistoryTask
,
req
.
pNodeList
);
streamTaskRestart
(
pHistoryTask
,
NULL
);
streamMetaReleaseTask
(
pMeta
,
pHistoryTask
);
}
else
{
tqError
(
"vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped"
,
pMeta
->
vgId
,
pTask
->
historyTaskId
.
taskId
);
streamMetaReleaseTask
(
pMeta
,
pTask
);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return
TSDB_CODE_SUCCESS
;
}
tqDebug
(
"s-task:%s fill-history task handle task update along with related stream task"
,
pHistoryTask
->
id
.
idStr
);
streamTaskUpdateEpsetInfo
(
pHistoryTask
,
req
.
pNodeList
);
}
if
(
pHistoryTask
!=
NULL
)
{
streamTaskRestart
(
pHistoryTask
,
NULL
);
streamMetaReleaseTask
(
pMeta
,
pHistoryTask
);
}
streamTaskRestart
(
pTask
,
NULL
);
streamMetaReleaseTask
(
pMeta
,
pTask
);
return
TSDB_CODE_SUCCESS
;
int32_t
code
=
rsp
.
code
;
_end:
tDecoderClear
(
&
decoder
);
tmsgSendRsp
(
&
rsp
);
return
code
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录