Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
d1003fed
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d1003fed
编写于
5月 27, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-335] message put into peer queue
上级
cf687279
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
24 addition
and
21 deletion
+24
-21
src/dnode/inc/dnodeMPeer.h
src/dnode/inc/dnodeMPeer.h
+2
-0
src/dnode/inc/dnodeMRead.h
src/dnode/inc/dnodeMRead.h
+2
-0
src/dnode/inc/dnodeMWrite.h
src/dnode/inc/dnodeMWrite.h
+2
-0
src/dnode/src/dnodePeer.c
src/dnode/src/dnodePeer.c
+14
-16
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+3
-3
src/mnode/inc/mnodePeer.h
src/mnode/inc/mnodePeer.h
+1
-0
src/mnode/src/mnodePeer.c
src/mnode/src/mnodePeer.c
+0
-2
未找到文件。
src/dnode/inc/dnodeMPeer.h
浏览文件 @
d1003fed
...
...
@@ -22,6 +22,8 @@ extern "C" {
int32_t
dnodeInitMnodePeer
();
void
dnodeCleanupMnodePeer
();
int32_t
dnodeAllocateMnodePqueue
();
void
dnodeFreeMnodePqueue
();
void
dnodeDispatchToMnodePeerQueue
(
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
...
...
src/dnode/inc/dnodeMRead.h
浏览文件 @
d1003fed
...
...
@@ -22,6 +22,8 @@ extern "C" {
int32_t
dnodeInitMnodeRead
();
void
dnodeCleanupMnodeRead
();
int32_t
dnodeAllocateMnodeRqueue
();
void
dnodeFreeMnodeRqueue
();
void
dnodeDispatchToMnodeReadQueue
(
SRpcMsg
*
rpcMsg
);
#ifdef __cplusplus
...
...
src/dnode/inc/dnodeMWrite.h
浏览文件 @
d1003fed
...
...
@@ -22,6 +22,8 @@ extern "C" {
int32_t
dnodeInitMnodeWrite
();
void
dnodeCleanupMnodeWrite
();
int32_t
dnodeAllocateMnodeWqueue
();
void
dnodeFreeMnodeWqueue
();
void
dnodeDispatchToMnodeWriteQueue
(
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
...
...
src/dnode/src/dnodePeer.c
浏览文件 @
d1003fed
...
...
@@ -28,8 +28,7 @@
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeVWrite.h"
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "dnodeMPeer.h"
extern
void
dnodeUpdateMnodeIpSetForPeer
(
SRpcIpSet
*
pIpSet
);
static
void
(
*
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
...
...
@@ -50,11 +49,11 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
dnodeDispatchToDnodeMgmt
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_CONFIG_DNODE
]
=
dnodeDispatchToDnodeMgmt
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_CONFIG_TABLE
]
=
dnodeDispatchToMnode
Read
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_CONFIG_VNODE
]
=
dnodeDispatchToMnode
Read
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_AUTH
]
=
dnodeDispatchToMnode
Read
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_GRANT
]
=
dnodeDispatchToMnode
Write
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_STATUS
]
=
dnodeDispatchToMnode
Write
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_CONFIG_TABLE
]
=
dnodeDispatchToMnode
Peer
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_CONFIG_VNODE
]
=
dnodeDispatchToMnode
Peer
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_AUTH
]
=
dnodeDispatchToMnode
Peer
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_GRANT
]
=
dnodeDispatchToMnode
Peer
Queue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_DM_STATUS
]
=
dnodeDispatchToMnode
Peer
Queue
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
...
...
@@ -103,16 +102,14 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
rpcSendResponse
(
&
rspMsg
);
return
;
}
if
(
dnodeProcessReqMsgFp
[
pMsg
->
msgType
])
{
(
*
dnodeProcessReqMsgFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
dTrace
(
"RPC %p, message:%s not processed"
,
pMsg
->
handle
,
taosMsg
[
pMsg
->
msgType
]);
rspMsg
.
code
=
TSDB_CODE_MSG_NOT_PROCESSED
;
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
dTrace
(
"RPC %p, message:%s not processed"
,
pMsg
->
handle
,
taosMsg
[
pMsg
->
msgType
]);
return
;
}
}
...
...
@@ -148,13 +145,14 @@ void dnodeCleanupClient() {
}
static
void
dnodeProcessRspFromDnode
(
SRpcMsg
*
pMsg
,
SRpcIpSet
*
pIpSet
)
{
if
(
dnodeProcessRspMsgFp
[
pMsg
->
msgType
])
{
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_DM_STATUS_RSP
&&
pIpSet
)
{
dnodeUpdateMnodeIpSetForPeer
(
pIpSet
);
}
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_DM_STATUS_RSP
&&
pIpSet
)
{
dnodeUpdateMnodeIpSetForPeer
(
pIpSet
);
}
if
(
dnodeProcessRspMsgFp
[
pMsg
->
msgType
])
{
(
*
dnodeProcessRspMsgFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
dError
(
"RPC %p, msg:%s is not processed"
,
pMsg
->
handle
,
taosMsg
[
pMsg
->
msgType
]
);
mnodeProcessPeerRsp
(
pMsg
);
}
rpcFreeCont
(
pMsg
->
pCont
);
...
...
src/dnode/src/dnodeShell.c
浏览文件 @
d1003fed
...
...
@@ -61,9 +61,10 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_QUERY
]
=
dnodeDispatchToMnodeWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_STREAM
]
=
dnodeDispatchToMnodeWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_KILL_CONN
]
=
dnodeDispatchToMnodeWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_
HEARTBEAT
]
=
dnodeDispatchToMnodeWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_
CONFIG_DNODE
]
=
dnodeDispatchToMnodeWriteQueue
;
// the following message shall be treated as mnode query
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_HEARTBEAT
]
=
dnodeDispatchToMnodeReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CONNECT
]
=
dnodeDispatchToMnodeReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_USE_DB
]
=
dnodeDispatchToMnodeReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_TABLE_META
]
=
dnodeDispatchToMnodeReadQueue
;
...
...
@@ -71,7 +72,6 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_TABLES_META
]
=
dnodeDispatchToMnodeReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_SHOW
]
=
dnodeDispatchToMnodeReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_RETRIEVE
]
=
dnodeDispatchToMnodeReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CONFIG_DNODE
]
=
dnodeDispatchToMnodeReadQueue
;
int32_t
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
;
numOfThreads
=
(
int32_t
)
((
1
.
0
-
tsRatioOfQueryThreads
)
*
numOfThreads
/
2
.
0
);
...
...
src/mnode/inc/mnodePeer.h
浏览文件 @
d1003fed
...
...
@@ -24,6 +24,7 @@ extern "C" {
void
mnodeAddPeerRspHandle
(
uint8_t
msgType
,
void
(
*
fp
)(
SRpcMsg
*
rpcMsg
));
void
mnodeAddPeerMsgHandle
(
uint8_t
msgType
,
int32_t
(
*
fp
)(
SMnodeMsg
*
mnodeMsg
));
int32_t
mnodeProcessPeerReq
(
SMnodeMsg
*
pMsg
);
void
mnodeProcessPeerRsp
(
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
src/mnode/src/mnodePeer.c
浏览文件 @
d1003fed
...
...
@@ -81,6 +81,4 @@ void mnodeProcessPeerRsp(SRpcMsg *pMsg) {
}
else
{
mError
(
"msg:%s is not processed"
,
pMsg
->
handle
,
taosMsg
[
pMsg
->
msgType
]);
}
rpcFreeCont
(
pMsg
->
pCont
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录