Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b6f893ce
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b6f893ce
编写于
5月 09, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
5月 09, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12250 from taosdata/fix/dnode
refactor: node mgmt
上级
7c128c8d
ede2bc03
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
192 addition
and
115 deletion
+192
-115
source/dnode/mgmt/implement/src/dmEps.c
source/dnode/mgmt/implement/src/dmEps.c
+15
-15
source/dnode/mgmt/implement/src/dmWorker.c
source/dnode/mgmt/implement/src/dmWorker.c
+20
-8
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
+19
-5
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
+11
-11
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
+1
-3
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+67
-39
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
+28
-17
source/dnode/mgmt/mgmt_snode/src/smWorker.c
source/dnode/mgmt/mgmt_snode/src/smWorker.c
+31
-17
未找到文件。
source/dnode/mgmt/implement/src/dmEps.c
浏览文件 @
b6f893ce
...
...
@@ -51,7 +51,7 @@ int32_t dmReadEps(SDnode *pDnode) {
pDnode
->
data
.
dnodeEps
=
taosArrayInit
(
1
,
sizeof
(
SDnodeEp
));
if
(
pDnode
->
data
.
dnodeEps
==
NULL
)
{
dError
(
"failed to calloc dnodeEp array since %s"
,
strerror
(
errno
));
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json"
,
pDnode
->
wrappers
[
DNODE
].
path
,
TD_DIRSEP
);
...
...
@@ -59,53 +59,53 @@ int32_t dmReadEps(SDnode *pDnode) {
if
(
pFile
==
NULL
)
{
// dDebug("file %s not exist", file);
code
=
0
;
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
len
=
(
int32_t
)
taosReadFile
(
pFile
,
content
,
maxLen
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
root
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dnodeId not found"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
pDnode
->
data
.
dnodeId
=
dnodeId
->
valueint
;
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterId"
);
if
(
!
clusterId
||
clusterId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since clusterId not found"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
pDnode
->
data
.
clusterId
=
atoll
(
clusterId
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
pDnode
->
data
.
dropped
=
dropped
->
valueint
;
cJSON
*
dnodes
=
cJSON_GetObjectItem
(
root
,
"dnodes"
);
if
(
!
dnodes
||
dnodes
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since dnodes not found"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
int32_t
numOfDnodes
=
cJSON_GetArraySize
(
dnodes
);
if
(
numOfDnodes
<=
0
)
{
dError
(
"failed to read %s since numOfDnodes:%d invalid"
,
file
,
numOfDnodes
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
for
(
int32_t
i
=
0
;
i
<
numOfDnodes
;
++
i
)
{
...
...
@@ -117,7 +117,7 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON
*
did
=
cJSON_GetObjectItem
(
node
,
"id"
);
if
(
!
did
||
did
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dnodeId not found"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
dnodeEp
.
id
=
did
->
valueint
;
...
...
@@ -125,14 +125,14 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON
*
dnodeFqdn
=
cJSON_GetObjectItem
(
node
,
"fqdn"
);
if
(
!
dnodeFqdn
||
dnodeFqdn
->
type
!=
cJSON_String
||
dnodeFqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s since dnodeFqdn not found"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
tstrncpy
(
dnodeEp
.
ep
.
fqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
dnodePort
=
cJSON_GetObjectItem
(
node
,
"port"
);
if
(
!
dnodePort
||
dnodePort
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dnodePort not found"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
dnodeEp
.
ep
.
port
=
dnodePort
->
valueint
;
...
...
@@ -140,7 +140,7 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON
*
isMnode
=
cJSON_GetObjectItem
(
node
,
"isMnode"
);
if
(
!
isMnode
||
isMnode
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since isMnode not found"
,
file
);
goto
PRASE_DNODE
_OVER
;
goto
_OVER
;
}
dnodeEp
.
isMnode
=
isMnode
->
valueint
;
...
...
@@ -151,7 +151,7 @@ int32_t dmReadEps(SDnode *pDnode) {
dDebug
(
"succcessed to read file %s"
,
file
);
dmPrintEps
(
pDnode
);
PRASE_DNODE
_OVER:
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
...
...
@@ -176,7 +176,7 @@ PRASE_DNODE_OVER:
int32_t
dmWriteEps
(
SDnode
*
pDnode
)
{
char
file
[
PATH_MAX
]
=
{
0
};
char
realfile
[
PATH_MAX
];
char
realfile
[
PATH_MAX
]
=
{
0
}
;
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json.bak"
,
pDnode
->
wrappers
[
DNODE
].
path
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%sdnode.json"
,
pDnode
->
wrappers
[
DNODE
].
path
,
TD_DIRSEP
);
...
...
source/dnode/mgmt/implement/src/dmWorker.c
浏览文件 @
b6f893ce
...
...
@@ -105,12 +105,13 @@ void dmStopMonitorThread(SDnode *pDnode) {
}
static
void
dmProcessMgmtQueue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
SDnode
*
pDnode
=
pInfo
->
ahandle
;
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
int32_t
code
=
-
1
;
SDnode
*
pDnode
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
tmsg_t
msgType
=
pMsg
->
rpcMsg
.
msgType
;
dTrace
(
"msg:%p, will be processed in dnode-mgmt queue"
,
pMsg
);
switch
(
pRpc
->
msgType
)
{
switch
(
msgType
)
{
case
TDMT_DND_CONFIG_DNODE
:
code
=
dmProcessConfigReq
(
pDnode
,
pMsg
);
break
;
...
...
@@ -148,9 +149,14 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
break
;
}
if
(
pRpc
->
msgType
&
1u
)
{
if
(
code
!=
0
)
code
=
terrno
;
SRpcMsg
rsp
=
{.
handle
=
pRpc
->
handle
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
code
,
.
refId
=
pRpc
->
refId
};
if
(
msgType
&
1u
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
SRpcMsg
rsp
=
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
code
=
code
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
};
rpcSendResponse
(
&
rsp
);
}
...
...
@@ -160,7 +166,13 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
int32_t
dmStartWorker
(
SDnode
*
pDnode
)
{
SSingleWorkerCfg
cfg
=
{.
min
=
1
,
.
max
=
1
,
.
name
=
"dnode-mgmt"
,
.
fp
=
(
FItem
)
dmProcessMgmtQueue
,
.
param
=
pDnode
};
SSingleWorkerCfg
cfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"dnode-mgmt"
,
.
fp
=
(
FItem
)
dmProcessMgmtQueue
,
.
param
=
pDnode
,
};
if
(
tSingleWorkerInit
(
&
pDnode
->
data
.
mgmtWorker
,
&
cfg
)
!=
0
)
{
dError
(
"failed to start dnode-mgmt worker since %s"
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
浏览文件 @
b6f893ce
...
...
@@ -18,7 +18,11 @@
static
void
bmSendErrorRsp
(
SNodeMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
code
=
code
,
.
refId
=
pMsg
->
rpcMsg
.
refId
};
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
code
=
code
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
};
tmsgSendRsp
(
&
rpcRsp
);
dTrace
(
"msg:%p, is freed"
,
pMsg
);
...
...
@@ -103,7 +107,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
int32_t
bmProcessWriteMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SBnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SBnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SMultiWorker
*
pWorker
=
&
pMgmt
->
writeWorker
;
dTrace
(
"msg:%p, put into worker:%s"
,
pMsg
,
pWorker
->
name
);
...
...
@@ -112,7 +116,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t
bmProcessMonitorMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SBnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SBnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
monitorWorker
;
dTrace
(
"msg:%p, put into worker:%s"
,
pMsg
,
pWorker
->
name
);
...
...
@@ -121,7 +125,12 @@ int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t
bmStartWorker
(
SBnodeMgmt
*
pMgmt
)
{
SMultiWorkerCfg
cfg
=
{.
max
=
1
,
.
name
=
"bnode-write"
,
.
fp
=
(
FItems
)
bmProcessWriteQueue
,
.
param
=
pMgmt
};
SMultiWorkerCfg
cfg
=
{
.
max
=
1
,
.
name
=
"bnode-write"
,
.
fp
=
(
FItems
)
bmProcessWriteQueue
,
.
param
=
pMgmt
,
};
if
(
tMultiWorkerInit
(
&
pMgmt
->
writeWorker
,
&
cfg
)
!=
0
)
{
dError
(
"failed to start bnode-write worker since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -129,7 +138,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if
(
tsMultiProcess
)
{
SSingleWorkerCfg
mCfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"bnode-monitor"
,
.
fp
=
(
FItem
)
bmProcessMonitorQueue
,
.
param
=
pMgmt
};
.
min
=
1
,
.
max
=
1
,
.
name
=
"bnode-monitor"
,
.
fp
=
(
FItem
)
bmProcessMonitorQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
dError
(
"failed to start bnode-monitor worker since %s"
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
浏览文件 @
b6f893ce
...
...
@@ -22,7 +22,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
int32_t
maxLen
=
4096
;
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
];
char
file
[
PATH_MAX
]
=
{
0
}
;
TdFilePtr
pFile
=
NULL
;
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json"
,
pMgmt
->
path
,
TD_DIRSEP
);
...
...
@@ -30,39 +30,39 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
if
(
pFile
==
NULL
)
{
// dDebug("file %s not exist", file);
code
=
0
;
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
len
=
(
int32_t
)
taosReadFile
(
pFile
,
content
,
maxLen
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
cJSON
*
deployed
=
cJSON_GetObjectItem
(
root
,
"deployed"
);
if
(
!
deployed
||
deployed
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since deployed not found"
,
file
);
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
*
pDeployed
=
deployed
->
valueint
;
cJSON
*
mnodes
=
cJSON_GetObjectItem
(
root
,
"mnodes"
);
if
(
!
mnodes
||
mnodes
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since nodes not found"
,
file
);
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
pMgmt
->
replica
=
cJSON_GetArraySize
(
mnodes
);
if
(
pMgmt
->
replica
<=
0
||
pMgmt
->
replica
>
TSDB_MAX_REPLICA
)
{
dError
(
"failed to read %s since mnodes size %d invalid"
,
file
,
pMgmt
->
replica
);
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
replica
;
++
i
)
{
...
...
@@ -74,21 +74,21 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
cJSON
*
id
=
cJSON_GetObjectItem
(
node
,
"id"
);
if
(
!
id
||
id
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since id not found"
,
file
);
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
pReplica
->
id
=
id
->
valueint
;
cJSON
*
fqdn
=
cJSON_GetObjectItem
(
node
,
"fqdn"
);
if
(
!
fqdn
||
fqdn
->
type
!=
cJSON_String
||
fqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s since fqdn not found"
,
file
);
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
tstrncpy
(
pReplica
->
fqdn
,
fqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
port
=
cJSON_GetObjectItem
(
node
,
"port"
);
if
(
!
port
||
port
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since port not found"
,
file
);
goto
PRASE_MNODE
_OVER
;
goto
_OVER
;
}
pReplica
->
port
=
port
->
valueint
;
}
...
...
@@ -96,7 +96,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
code
=
0
;
dDebug
(
"succcessed to read file %s, deployed:%d"
,
file
,
*
pDeployed
);
PRASE_MNODE
_OVER:
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
浏览文件 @
b6f893ce
...
...
@@ -161,9 +161,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
SMnodeOpt
option
=
{
0
};
if
(
!
deployed
)
{
dInfo
(
"mnode start to deploy"
);
// if (pWrapper->procType == DND_PROC_CHILD) {
pWrapper
->
pDnode
->
data
.
dnodeId
=
1
;
// }
pWrapper
->
pDnode
->
data
.
dnodeId
=
1
;
mmBuildOptionForDeploy
(
pMgmt
,
&
option
);
}
else
{
dInfo
(
"mnode start to open"
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
b6f893ce
...
...
@@ -17,42 +17,48 @@
#include "mmInt.h"
static
inline
void
mmSendRsp
(
SNodeMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
};
SRpcMsg
rsp
=
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
,
};
tmsgSendRsp
(
&
rsp
);
}
static
void
mmProcessQueue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
tmsg_t
msgType
=
pMsg
->
rpcMsg
.
msgType
;
dTrace
(
"msg:%p, get from mnode queue"
,
pMsg
);
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
int32_t
code
=
-
1
;
if
(
pMsg
->
rpcMsg
.
msgType
==
TDMT_DND_ALTER_MNODE
)
{
code
=
mmProcessAlterReq
(
pMgmt
,
pMsg
);
}
else
if
(
pMsg
->
rpcMsg
.
msgType
==
TDMT_MON_MM_INFO
)
{
code
=
mmProcessGetMonMmInfoReq
(
pMgmt
->
pWrapper
,
pMsg
);
}
else
if
(
pMsg
->
rpcMsg
.
msgType
==
TDMT_MON_MM_LOAD
)
{
code
=
mmProcessGetMnodeLoadsReq
(
pMgmt
->
pWrapper
,
pMsg
);
}
else
{
pMsg
->
pNode
=
pMgmt
->
pMnode
;
code
=
mndProcessMsg
(
pMsg
);
switch
(
msgType
)
{
case
TDMT_DND_ALTER_MNODE
:
code
=
mmProcessAlterReq
(
pMgmt
,
pMsg
);
break
;
case
TDMT_MON_MM_INFO
:
code
=
mmProcessGetMonMmInfoReq
(
pMgmt
->
pWrapper
,
pMsg
);
break
;
case
TDMT_MON_MM_LOAD
:
code
=
mmProcessGetMnodeLoadsReq
(
pMgmt
->
pWrapper
,
pMsg
);
break
;
default:
pMsg
->
pNode
=
pMgmt
->
pMnode
;
code
=
mndProcessMsg
(
pMsg
);
}
if
(
pRpc
->
msgType
&
1U
)
{
if
(
p
Rpc
->
handle
!=
NULL
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
if
(
msgType
&
1U
)
{
if
(
p
Msg
->
rpcMsg
.
handle
!=
NULL
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
mmSendRsp
(
pMsg
,
code
);
}
}
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
p
Rpc
->
pCont
);
rpcFreeCont
(
p
Msg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
}
...
...
@@ -78,38 +84,38 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem
(
pMsg
);
}
static
void
mmPutMsgToWorker
(
SSingleWorker
*
pWorker
,
SNodeMsg
*
pMsg
)
{
static
void
mmPut
Node
MsgToWorker
(
SSingleWorker
*
pWorker
,
SNodeMsg
*
pMsg
)
{
dTrace
(
"msg:%p, put into worker %s"
,
pMsg
,
pWorker
->
name
);
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
}
int32_t
mmProcessWriteMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
mmPutMsgToWorker
(
&
pMgmt
->
writeWorker
,
pMsg
);
mmPut
Node
MsgToWorker
(
&
pMgmt
->
writeWorker
,
pMsg
);
return
0
;
}
int32_t
mmProcessSyncMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
mmPutMsgToWorker
(
&
pMgmt
->
syncWorker
,
pMsg
);
mmPut
Node
MsgToWorker
(
&
pMgmt
->
syncWorker
,
pMsg
);
return
0
;
}
int32_t
mmProcessReadMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
mmPutMsgToWorker
(
&
pMgmt
->
readWorker
,
pMsg
);
mmPut
Node
MsgToWorker
(
&
pMgmt
->
readWorker
,
pMsg
);
return
0
;
}
int32_t
mmProcessQueryMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
mmPutMsgToWorker
(
&
pMgmt
->
queryWorker
,
pMsg
);
mmPut
Node
MsgToWorker
(
&
pMgmt
->
queryWorker
,
pMsg
);
return
0
;
}
int32_t
mmProcessMonitorMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
mmPutMsgToWorker
(
&
pMgmt
->
monitorWorker
,
pMsg
);
mmPut
Node
MsgToWorker
(
&
pMgmt
->
monitorWorker
,
pMsg
);
return
0
;
}
...
...
@@ -144,40 +150,62 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
}
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
)
{
SSingleWorkerCfg
qCfg
=
{.
min
=
tsNumOfMnodeQueryThreads
,
.
max
=
tsNumOfMnodeQueryThreads
,
.
name
=
"mnode-query"
,
.
fp
=
(
FItem
)
mmProcessQueryQueue
,
.
param
=
pMgmt
};
SSingleWorkerCfg
qCfg
=
{
.
min
=
tsNumOfMnodeQueryThreads
,
.
max
=
tsNumOfMnodeQueryThreads
,
.
name
=
"mnode-query"
,
.
fp
=
(
FItem
)
mmProcessQueryQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
queryWorker
,
&
qCfg
)
!=
0
)
{
dError
(
"failed to start mnode-query worker since %s"
,
terrstr
());
return
-
1
;
}
SSingleWorkerCfg
rCfg
=
{.
min
=
tsNumOfMnodeReadThreads
,
.
max
=
tsNumOfMnodeReadThreads
,
.
name
=
"mnode-read"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
};
SSingleWorkerCfg
rCfg
=
{
.
min
=
tsNumOfMnodeReadThreads
,
.
max
=
tsNumOfMnodeReadThreads
,
.
name
=
"mnode-read"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
readWorker
,
&
rCfg
)
!=
0
)
{
dError
(
"failed to start mnode-read worker since %s"
,
terrstr
());
return
-
1
;
}
SSingleWorkerCfg
wCfg
=
{.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-write"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
};
SSingleWorkerCfg
wCfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-write"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
writeWorker
,
&
wCfg
)
!=
0
)
{
dError
(
"failed to start mnode-write worker since %s"
,
terrstr
());
return
-
1
;
}
SSingleWorkerCfg
sCfg
=
{.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-sync"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
};
SSingleWorkerCfg
sCfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-sync"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
syncWorker
,
&
sCfg
)
!=
0
)
{
dError
(
"failed to start mnode mnode-sync worker since %s"
,
terrstr
());
return
-
1
;
}
if
(
tsMultiProcess
)
{
SSingleWorkerCfg
mCfg
=
{.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-monitor"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
};
SSingleWorkerCfg
mCfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-monitor"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
dError
(
"failed to start mnode mnode-monitor worker since %s"
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
浏览文件 @
b6f893ce
...
...
@@ -17,12 +17,14 @@
#include "qmInt.h"
static
inline
void
qmSendRsp
(
SNodeMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
};
SRpcMsg
rsp
=
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
,
};
tmsgSendRsp
(
&
rsp
);
}
...
...
@@ -145,22 +147,26 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
}
int32_t
qmStartWorker
(
SQnodeMgmt
*
pMgmt
)
{
SSingleWorkerCfg
queryCfg
=
{.
min
=
tsNumOfVnodeQueryThreads
,
.
max
=
tsNumOfVnodeQueryThreads
,
.
name
=
"qnode-query"
,
.
fp
=
(
FItem
)
qmProcessQueryQueue
,
.
param
=
pMgmt
};
SSingleWorkerCfg
queryCfg
=
{
.
min
=
tsNumOfVnodeQueryThreads
,
.
max
=
tsNumOfVnodeQueryThreads
,
.
name
=
"qnode-query"
,
.
fp
=
(
FItem
)
qmProcessQueryQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
queryWorker
,
&
queryCfg
)
!=
0
)
{
dError
(
"failed to start qnode-query worker since %s"
,
terrstr
());
return
-
1
;
}
SSingleWorkerCfg
fetchCfg
=
{.
min
=
tsNumOfQnodeFetchThreads
,
.
max
=
tsNumOfQnodeFetchThreads
,
.
name
=
"qnode-fetch"
,
.
fp
=
(
FItem
)
qmProcessFetchQueue
,
.
param
=
pMgmt
};
SSingleWorkerCfg
fetchCfg
=
{
.
min
=
tsNumOfQnodeFetchThreads
,
.
max
=
tsNumOfQnodeFetchThreads
,
.
name
=
"qnode-fetch"
,
.
fp
=
(
FItem
)
qmProcessFetchQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
fetchWorker
,
&
fetchCfg
)
!=
0
)
{
dError
(
"failed to start qnode-fetch worker since %s"
,
terrstr
());
...
...
@@ -169,7 +175,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
if
(
tsMultiProcess
)
{
SSingleWorkerCfg
mCfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"qnode-monitor"
,
.
fp
=
(
FItem
)
qmProcessMonitorQueue
,
.
param
=
pMgmt
};
.
min
=
1
,
.
max
=
1
,
.
name
=
"qnode-monitor"
,
.
fp
=
(
FItem
)
qmProcessMonitorQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
dError
(
"failed to start qnode-monitor worker since %s"
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/mgmt_snode/src/smWorker.c
浏览文件 @
b6f893ce
...
...
@@ -17,12 +17,14 @@
#include "smInt.h"
static
inline
void
smSendRsp
(
SNodeMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
};
SRpcMsg
rsp
=
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
,
};
tmsgSendRsp
(
&
rsp
);
}
...
...
@@ -90,7 +92,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return
-
1
;
}
SMultiWorkerCfg
cfg
=
{.
max
=
1
,
.
name
=
"snode-unique"
,
.
fp
=
smProcessUniqueQueue
,
.
param
=
pMgmt
};
SMultiWorkerCfg
cfg
=
{
.
max
=
1
,
.
name
=
"snode-unique"
,
.
fp
=
smProcessUniqueQueue
,
.
param
=
pMgmt
,
};
if
(
tMultiWorkerInit
(
pUniqueWorker
,
&
cfg
)
!=
0
)
{
dError
(
"failed to start snode-unique worker since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -101,11 +108,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
}
}
SSingleWorkerCfg
cfg
=
{.
min
=
tsNumOfSnodeSharedThreads
,
.
max
=
tsNumOfSnodeSharedThreads
,
.
name
=
"snode-shared"
,
.
fp
=
(
FItem
)
smProcessSharedQueue
,
.
param
=
pMgmt
};
SSingleWorkerCfg
cfg
=
{
.
min
=
tsNumOfSnodeSharedThreads
,
.
max
=
tsNumOfSnodeSharedThreads
,
.
name
=
"snode-shared"
,
.
fp
=
(
FItem
)
smProcessSharedQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
sharedWorker
,
&
cfg
))
{
dError
(
"failed to start snode shared-worker since %s"
,
terrstr
());
...
...
@@ -114,7 +123,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
if
(
tsMultiProcess
)
{
SSingleWorkerCfg
mCfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"snode-monitor"
,
.
fp
=
(
FItem
)
smProcessMonitorQueue
,
.
param
=
pMgmt
};
.
min
=
1
,
.
max
=
1
,
.
name
=
"snode-monitor"
,
.
fp
=
(
FItem
)
smProcessMonitorQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
dError
(
"failed to start snode-monitor worker since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -150,7 +164,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
}
int32_t
smProcessMgmtMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SSnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SMultiWorker
*
pWorker
=
taosArrayGetP
(
pMgmt
->
uniqueWorkers
,
0
);
if
(
pWorker
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
...
...
@@ -163,7 +177,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t
smProcessMonitorMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SSnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
monitorWorker
;
dTrace
(
"msg:%p, put into worker:%s"
,
pMsg
,
pWorker
->
name
);
...
...
@@ -172,7 +186,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t
smProcessUniqueMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SSnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
int32_t
index
=
smGetSWIdFromMsg
(
&
pMsg
->
rpcMsg
);
SMultiWorker
*
pWorker
=
taosArrayGetP
(
pMgmt
->
uniqueWorkers
,
index
);
if
(
pWorker
==
NULL
)
{
...
...
@@ -186,7 +200,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t
smProcessSharedMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SSnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
sharedWorker
;
dTrace
(
"msg:%p, put into worker:%s"
,
pMsg
,
pWorker
->
name
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录