Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
add51b49
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
add51b49
编写于
5月 11, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: node mgmt
上级
65e8316f
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
585 addition
and
650 deletion
+585
-650
include/common/tmsgcb.h
include/common/tmsgcb.h
+3
-0
source/common/src/tmsgcb.c
source/common/src/tmsgcb.c
+9
-0
source/dnode/mgmt/CMakeLists.txt
source/dnode/mgmt/CMakeLists.txt
+1
-0
source/dnode/mgmt/mgmt_dnode/CMakeLists.txt
source/dnode/mgmt/mgmt_dnode/CMakeLists.txt
+9
-0
source/dnode/mgmt/mgmt_dnode/inc/dmInt.h
source/dnode/mgmt/mgmt_dnode/inc/dmInt.h
+69
-0
source/dnode/mgmt/mgmt_dnode/src/dmEps.c
source/dnode/mgmt/mgmt_dnode/src/dmEps.c
+58
-59
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+155
-0
source/dnode/mgmt/mgmt_dnode/src/dmInt.c
source/dnode/mgmt/mgmt_dnode/src/dmInt.c
+104
-0
source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c
source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c
+105
-0
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
+44
-44
source/dnode/mgmt/node_mgmt/CMakeLists.txt
source/dnode/mgmt/node_mgmt/CMakeLists.txt
+2
-2
source/dnode/mgmt/node_mgmt/src/dmHandle.c
source/dnode/mgmt/node_mgmt/src/dmHandle.c
+0
-309
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
+0
-211
source/dnode/mgmt/node_util/inc/dmUtil.h
source/dnode/mgmt/node_util/inc/dmUtil.h
+26
-25
未找到文件。
include/common/tmsgcb.h
浏览文件 @
add51b49
...
...
@@ -42,6 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype);
typedef
int32_t
(
*
SendReqFp
)(
SMgmtWrapper
*
pWrapper
,
const
SEpSet
*
epSet
,
SRpcMsg
*
pReq
);
typedef
int32_t
(
*
SendMnodeReqFp
)(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pReq
);
typedef
void
(
*
SendRspFp
)(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
);
typedef
void
(
*
SendMnodeRecvFp
)(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pReq
,
const
SRpcMsg
*
pRsp
);
typedef
void
(
*
SendRedirectRspFp
)(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
,
const
SEpSet
*
pNewEpSet
);
typedef
void
(
*
RegisterBrokenLinkArgFp
)(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
typedef
void
(
*
ReleaseHandleFp
)(
SMgmtWrapper
*
pWrapper
,
void
*
handle
,
int8_t
type
);
...
...
@@ -55,6 +56,7 @@ typedef struct {
GetQueueSizeFp
qsizeFp
;
SendReqFp
sendReqFp
;
SendRspFp
sendRspFp
;
SendMnodeRecvFp
sendMnodeRecvFp
;
SendRedirectRspFp
sendRedirectRspFp
;
RegisterBrokenLinkArgFp
registerBrokenLinkArgFp
;
ReleaseHandleFp
releaseHandleFp
;
...
...
@@ -66,6 +68,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t
tmsgGetQueueSize
(
const
SMsgCb
*
pMsgCb
,
int32_t
vgId
,
EQueueType
qtype
);
int32_t
tmsgSendReq
(
const
SMsgCb
*
pMsgCb
,
const
SEpSet
*
epSet
,
SRpcMsg
*
pReq
);
void
tmsgSendRsp
(
const
SRpcMsg
*
pRsp
);
void
tmsgSendMnodeRecv
(
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
);
void
tmsgSendRedirectRsp
(
const
SRpcMsg
*
pRsp
,
const
SEpSet
*
pNewEpSet
);
void
tmsgRegisterBrokenLinkArg
(
const
SMsgCb
*
pMsgCb
,
SRpcMsg
*
pMsg
);
void
tmsgReleaseHandle
(
void
*
handle
,
int8_t
type
);
...
...
source/common/src/tmsgcb.c
浏览文件 @
add51b49
...
...
@@ -69,6 +69,15 @@ void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
}
}
void
tmsgSendMnodeRecv
(
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
)
{
SendMnodeRecvFp
fp
=
tsDefaultMsgCb
.
sendMnodeRecvFp
;
if
(
fp
!=
NULL
)
{
(
*
fp
)(
tsDefaultMsgCb
.
pWrapper
,
pReq
,
pRsp
);
}
else
{
terrno
=
TSDB_CODE_INVALID_PTR
;
}
}
void
tmsgRegisterBrokenLinkArg
(
const
SMsgCb
*
pMsgCb
,
SRpcMsg
*
pMsg
)
{
RegisterBrokenLinkArgFp
fp
=
pMsgCb
->
registerBrokenLinkArgFp
;
if
(
fp
!=
NULL
)
{
...
...
source/dnode/mgmt/CMakeLists.txt
浏览文件 @
add51b49
...
...
@@ -5,6 +5,7 @@ add_subdirectory(mgmt_mnode)
add_subdirectory
(
mgmt_qnode
)
add_subdirectory
(
mgmt_snode
)
add_subdirectory
(
mgmt_vnode
)
add_subdirectory
(
mgmt_dnode
)
add_subdirectory
(
test
)
aux_source_directory
(
exe EXEC_SRC
)
...
...
source/dnode/mgmt/mgmt_dnode/CMakeLists.txt
0 → 100644
浏览文件 @
add51b49
aux_source_directory
(
src MGMT_DNODE
)
add_library
(
mgmt_dnode STATIC
${
MGMT_DNODE
}
)
target_include_directories
(
mgmt_dnode
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_link_libraries
(
mgmt_dnode node_util
)
\ No newline at end of file
source/dnode/mgmt/mgmt_dnode/inc/dmInt.h
0 → 100644
浏览文件 @
add51b49
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_QNODE_INT_H_
#define _TD_DND_QNODE_INT_H_
#include "dmUtil.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SDnodeMgmt
{
struct
SDnode
*
pDnode
;
SMsgCb
msgCb
;
const
char
*
path
;
const
char
*
name
;
TdThread
*
statusThreadId
;
TdThread
*
monitorThreadId
;
SSingleWorker
mgmtWorker
;
ProcessCreateNodeFp
processCreateNodeFp
;
ProcessDropNodeFp
processDropNodeFp
;
IsNodeDeployedFp
isNodeDeployedFp
;
SDnodeData
data
;
}
SDnodeMgmt
;
// dmEps.c
int32_t
dmReadEps
(
SDnodeMgmt
*
pMgmt
);
int32_t
dmWriteEps
(
SDnodeMgmt
*
pMgmt
);
void
dmUpdateEps
(
SDnodeMgmt
*
pMgmt
,
SArray
*
pDnodeEps
);
// dmHandle.c
SArray
*
dmGetMsgHandles
();
void
dmSendStatusReq
(
SDnodeMgmt
*
pMgmt
);
int32_t
dmProcessConfigReq
(
SDnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessAuthRsp
(
SDnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessGrantRsp
(
SDnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
// dmMonitor.c
void
dmGetVnodeLoads
(
SDnodeMgmt
*
pMgmt
,
SMonVloadInfo
*
pInfo
);
void
dmGetMnodeLoads
(
SDnodeMgmt
*
pMgmt
,
SMonMloadInfo
*
pInfo
);
void
dmSendMonitorReport
(
SDnodeMgmt
*
pMgmt
);
// dmWorker.c
int32_t
dmPutNodeMsgToMgmtQueue
(
SDnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmStartStatusThread
(
SDnodeMgmt
*
pMgmt
);
void
dmStopStatusThread
(
SDnodeMgmt
*
pMgmt
);
int32_t
dmStartMonitorThread
(
SDnodeMgmt
*
pMgmt
);
void
dmStopMonitorThread
(
SDnodeMgmt
*
pMgmt
);
int32_t
dmStartWorker
(
SDnodeMgmt
*
pMgmt
);
void
dmStopWorker
(
SDnodeMgmt
*
pMgmt
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DND_QNODE_INT_H_*/
\ No newline at end of file
source/dnode/mgmt/
node_mgmt
/src/dmEps.c
→
source/dnode/mgmt/
mgmt_dnode
/src/dmEps.c
浏览文件 @
add51b49
...
...
@@ -14,16 +14,16 @@
*/
#define _DEFAULT_SOURCE
#include "dmI
mp
.h"
#include "dmI
nt
.h"
static
void
dmPrintEps
(
SDnode
*
pDnode
);
static
bool
dmIsEpChanged
(
SDnode
*
pDnode
,
int32_t
dnodeId
,
const
char
*
ep
);
static
void
dmResetEps
(
SDnode
*
pDnode
,
SArray
*
dnodeEps
);
static
void
dmPrintEps
(
SDnode
Mgmt
*
pMgmt
);
static
bool
dmIsEpChanged
(
SDnode
Mgmt
*
pMgmt
,
int32_t
dnodeId
,
const
char
*
ep
);
static
void
dmResetEps
(
SDnode
Mgmt
*
pMgmt
,
SArray
*
dnodeEps
);
static
void
dmGetDnodeEp
(
SDnode
*
pDnode
,
int32_t
dnodeId
,
char
*
pEp
,
char
*
pFqdn
,
uint16_t
*
pPort
)
{
taosRLockLatch
(
&
p
Dnode
->
data
.
latch
);
static
void
dmGetDnodeEp
(
SDnode
Mgmt
*
pMgmt
,
int32_t
dnodeId
,
char
*
pEp
,
char
*
pFqdn
,
uint16_t
*
pPort
)
{
taosRLockLatch
(
&
p
Mgmt
->
data
.
latch
);
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
p
Dnode
->
data
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
p
Mgmt
->
data
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
pDnodeEp
!=
NULL
)
{
if
(
pPort
!=
NULL
)
{
*
pPort
=
pDnodeEp
->
ep
.
port
;
...
...
@@ -36,10 +36,10 @@ static void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn
}
}
taosRUnLockLatch
(
&
p
Dnode
->
data
.
latch
);
taosRUnLockLatch
(
&
p
Mgmt
->
data
.
latch
);
}
int32_t
dmReadEps
(
SDnode
*
pDnode
)
{
int32_t
dmReadEps
(
SDnode
Mgmt
*
pMgmt
)
{
int32_t
code
=
TSDB_CODE_INVALID_JSON_FORMAT
;
int32_t
len
=
0
;
int32_t
maxLen
=
256
*
1024
;
...
...
@@ -48,16 +48,15 @@ int32_t dmReadEps(SDnode *pDnode) {
char
file
[
PATH_MAX
]
=
{
0
};
TdFilePtr
pFile
=
NULL
;
p
Dnode
->
data
.
dnodeEps
=
taosArrayInit
(
1
,
sizeof
(
SDnodeEp
));
if
(
p
Dnode
->
data
.
dnodeEps
==
NULL
)
{
p
Mgmt
->
data
.
dnodeEps
=
taosArrayInit
(
1
,
sizeof
(
SDnodeEp
));
if
(
p
Mgmt
->
data
.
dnodeEps
==
NULL
)
{
dError
(
"failed to calloc dnodeEp array since %s"
,
strerror
(
errno
));
goto
_OVER
;
}
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json"
,
p
Dnode
->
wrappers
[
DNODE
].
path
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json"
,
p
Mgmt
->
path
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
// dDebug("file %s not exist", file);
code
=
0
;
goto
_OVER
;
}
...
...
@@ -80,21 +79,21 @@ int32_t dmReadEps(SDnode *pDnode) {
dError
(
"failed to read %s since dnodeId not found"
,
file
);
goto
_OVER
;
}
p
Dnode
->
data
.
dnodeId
=
dnodeId
->
valueint
;
p
Mgmt
->
data
.
dnodeId
=
dnodeId
->
valueint
;
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterId"
);
if
(
!
clusterId
||
clusterId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since clusterId not found"
,
file
);
goto
_OVER
;
}
p
Dnode
->
data
.
clusterId
=
atoll
(
clusterId
->
valuestring
);
p
Mgmt
->
data
.
clusterId
=
atoll
(
clusterId
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
_OVER
;
}
p
Dnode
->
data
.
dropped
=
dropped
->
valueint
;
p
Mgmt
->
data
.
dropped
=
dropped
->
valueint
;
cJSON
*
dnodes
=
cJSON_GetObjectItem
(
root
,
"dnodes"
);
if
(
!
dnodes
||
dnodes
->
type
!=
cJSON_Array
)
{
...
...
@@ -144,29 +143,29 @@ int32_t dmReadEps(SDnode *pDnode) {
}
dnodeEp
.
isMnode
=
isMnode
->
valueint
;
taosArrayPush
(
p
Dnode
->
data
.
dnodeEps
,
&
dnodeEp
);
taosArrayPush
(
p
Mgmt
->
data
.
dnodeEps
,
&
dnodeEp
);
}
code
=
0
;
dDebug
(
"succcessed to read file %s"
,
file
);
dmPrintEps
(
p
Dnode
);
dmPrintEps
(
p
Mgmt
);
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
if
(
taosArrayGetSize
(
p
Dnode
->
data
.
dnodeEps
)
==
0
)
{
if
(
taosArrayGetSize
(
p
Mgmt
->
data
.
dnodeEps
)
==
0
)
{
SDnodeEp
dnodeEp
=
{
0
};
dnodeEp
.
isMnode
=
1
;
taosGetFqdnPortFromEp
(
p
Dnode
->
data
.
firstEp
,
&
dnodeEp
.
ep
);
taosArrayPush
(
p
Dnode
->
data
.
dnodeEps
,
&
dnodeEp
);
taosGetFqdnPortFromEp
(
p
Mgmt
->
data
.
firstEp
,
&
dnodeEp
.
ep
);
taosArrayPush
(
p
Mgmt
->
data
.
dnodeEps
,
&
dnodeEp
);
}
dmResetEps
(
p
Dnode
,
pDnode
->
data
.
dnodeEps
);
dmResetEps
(
p
Mgmt
,
pMgmt
->
data
.
dnodeEps
);
if
(
dmIsEpChanged
(
p
Dnode
,
pDnode
->
data
.
dnodeId
,
pDnode
->
data
.
localEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
p
Dnode
->
data
.
localEp
,
file
);
if
(
dmIsEpChanged
(
p
Mgmt
,
pMgmt
->
data
.
dnodeId
,
pMgmt
->
data
.
localEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
p
Mgmt
->
data
.
localEp
,
file
);
return
-
1
;
}
...
...
@@ -174,11 +173,11 @@ _OVER:
return
code
;
}
int32_t
dmWriteEps
(
SDnode
*
pDnode
)
{
int32_t
dmWriteEps
(
SDnode
Mgmt
*
pMgmt
)
{
char
file
[
PATH_MAX
]
=
{
0
};
char
realfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json.bak"
,
p
Dnode
->
wrappers
[
DNODE
].
path
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%sdnode.json"
,
p
Dnode
->
wrappers
[
DNODE
].
path
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json.bak"
,
p
Mgmt
->
path
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%sdnode.json"
,
p
Mgmt
->
path
,
TD_DIRSEP
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
...
...
@@ -192,14 +191,14 @@ int32_t dmWriteEps(SDnode *pDnode) {
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
: %d,
\n
"
,
p
Dnode
->
data
.
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
p
Dnode
->
data
.
clusterId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d,
\n
"
,
p
Dnode
->
data
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
: %d,
\n
"
,
p
Mgmt
->
data
.
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
p
Mgmt
->
data
.
clusterId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d,
\n
"
,
p
Mgmt
->
data
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodes
\"
: [{
\n
"
);
int32_t
numOfEps
=
(
int32_t
)
taosArrayGetSize
(
p
Dnode
->
data
.
dnodeEps
);
int32_t
numOfEps
=
(
int32_t
)
taosArrayGetSize
(
p
Mgmt
->
data
.
dnodeEps
);
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
++
i
)
{
SDnodeEp
*
pDnodeEp
=
taosArrayGet
(
p
Dnode
->
data
.
dnodeEps
,
i
);
SDnodeEp
*
pDnodeEp
=
taosArrayGet
(
p
Mgmt
->
data
.
dnodeEps
,
i
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
id
\"
: %d,
\n
"
,
pDnodeEp
->
id
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pDnodeEp
->
ep
.
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u,
\n
"
,
pDnodeEp
->
ep
.
port
);
...
...
@@ -223,41 +222,41 @@ int32_t dmWriteEps(SDnode *pDnode) {
return
-
1
;
}
p
Dnode
->
data
.
updateTime
=
taosGetTimestampMs
();
p
Mgmt
->
data
.
updateTime
=
taosGetTimestampMs
();
dDebug
(
"successed to write %s"
,
realfile
);
return
0
;
}
void
dmUpdateEps
(
SDnode
*
pDnode
,
SArray
*
eps
)
{
void
dmUpdateEps
(
SDnode
Mgmt
*
pMgmt
,
SArray
*
eps
)
{
int32_t
numOfEps
=
taosArrayGetSize
(
eps
);
if
(
numOfEps
<=
0
)
return
;
taosWLockLatch
(
&
p
Dnode
->
data
.
latch
);
taosWLockLatch
(
&
p
Mgmt
->
data
.
latch
);
int32_t
numOfEpsOld
=
(
int32_t
)
taosArrayGetSize
(
p
Dnode
->
data
.
dnodeEps
);
int32_t
numOfEpsOld
=
(
int32_t
)
taosArrayGetSize
(
p
Mgmt
->
data
.
dnodeEps
);
if
(
numOfEps
!=
numOfEpsOld
)
{
dmResetEps
(
p
Dnode
,
eps
);
dmWriteEps
(
p
Dnode
);
dmResetEps
(
p
Mgmt
,
eps
);
dmWriteEps
(
p
Mgmt
);
}
else
{
int32_t
size
=
numOfEps
*
sizeof
(
SDnodeEp
);
if
(
memcmp
(
p
Dnode
->
data
.
dnodeEps
->
pData
,
eps
->
pData
,
size
)
!=
0
)
{
dmResetEps
(
p
Dnode
,
eps
);
dmWriteEps
(
p
Dnode
);
if
(
memcmp
(
p
Mgmt
->
data
.
dnodeEps
->
pData
,
eps
->
pData
,
size
)
!=
0
)
{
dmResetEps
(
p
Mgmt
,
eps
);
dmWriteEps
(
p
Mgmt
);
}
}
taosWUnLockLatch
(
&
p
Dnode
->
data
.
latch
);
taosWUnLockLatch
(
&
p
Mgmt
->
data
.
latch
);
}
static
void
dmResetEps
(
SDnode
*
pDnode
,
SArray
*
dnodeEps
)
{
if
(
p
Dnode
->
data
.
dnodeEps
!=
dnodeEps
)
{
SArray
*
tmp
=
p
Dnode
->
data
.
dnodeEps
;
p
Dnode
->
data
.
dnodeEps
=
taosArrayDup
(
dnodeEps
);
static
void
dmResetEps
(
SDnode
Mgmt
*
pMgmt
,
SArray
*
dnodeEps
)
{
if
(
p
Mgmt
->
data
.
dnodeEps
!=
dnodeEps
)
{
SArray
*
tmp
=
p
Mgmt
->
data
.
dnodeEps
;
p
Mgmt
->
data
.
dnodeEps
=
taosArrayDup
(
dnodeEps
);
taosArrayDestroy
(
tmp
);
}
p
Dnode
->
data
.
mnodeEps
.
inUse
=
0
;
p
Dnode
->
data
.
mnodeEps
.
numOfEps
=
0
;
p
Mgmt
->
data
.
mnodeEps
.
inUse
=
0
;
p
Mgmt
->
data
.
mnodeEps
.
numOfEps
=
0
;
int32_t
mIndex
=
0
;
int32_t
numOfEps
=
(
int32_t
)
taosArrayGetSize
(
dnodeEps
);
...
...
@@ -266,35 +265,35 @@ static void dmResetEps(SDnode *pDnode, SArray *dnodeEps) {
SDnodeEp
*
pDnodeEp
=
taosArrayGet
(
dnodeEps
,
i
);
if
(
!
pDnodeEp
->
isMnode
)
continue
;
if
(
mIndex
>=
TSDB_MAX_REPLICA
)
continue
;
p
Dnode
->
data
.
mnodeEps
.
numOfEps
++
;
p
Mgmt
->
data
.
mnodeEps
.
numOfEps
++
;
p
Dnode
->
data
.
mnodeEps
.
eps
[
mIndex
]
=
pDnodeEp
->
ep
;
p
Mgmt
->
data
.
mnodeEps
.
eps
[
mIndex
]
=
pDnodeEp
->
ep
;
mIndex
++
;
}
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
i
++
)
{
SDnodeEp
*
pDnodeEp
=
taosArrayGet
(
dnodeEps
,
i
);
taosHashPut
(
p
Dnode
->
data
.
dnodeHash
,
&
pDnodeEp
->
id
,
sizeof
(
int32_t
),
pDnodeEp
,
sizeof
(
SDnodeEp
));
taosHashPut
(
p
Mgmt
->
data
.
dnodeHash
,
&
pDnodeEp
->
id
,
sizeof
(
int32_t
),
pDnodeEp
,
sizeof
(
SDnodeEp
));
}
dmPrintEps
(
p
Dnode
);
dmPrintEps
(
p
Mgmt
);
}
static
void
dmPrintEps
(
SDnode
*
pDnode
)
{
int32_t
numOfEps
=
(
int32_t
)
taosArrayGetSize
(
p
Dnode
->
data
.
dnodeEps
);
static
void
dmPrintEps
(
SDnode
Mgmt
*
pMgmt
)
{
int32_t
numOfEps
=
(
int32_t
)
taosArrayGetSize
(
p
Mgmt
->
data
.
dnodeEps
);
dDebug
(
"print dnode ep list, num:%d"
,
numOfEps
);
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
i
++
)
{
SDnodeEp
*
pEp
=
taosArrayGet
(
p
Dnode
->
data
.
dnodeEps
,
i
);
SDnodeEp
*
pEp
=
taosArrayGet
(
p
Mgmt
->
data
.
dnodeEps
,
i
);
dDebug
(
"dnode:%d, fqdn:%s port:%u isMnode:%d"
,
pEp
->
id
,
pEp
->
ep
.
fqdn
,
pEp
->
ep
.
port
,
pEp
->
isMnode
);
}
}
static
bool
dmIsEpChanged
(
SDnode
*
pDnode
,
int32_t
dnodeId
,
const
char
*
ep
)
{
static
bool
dmIsEpChanged
(
SDnode
Mgmt
*
pMgmt
,
int32_t
dnodeId
,
const
char
*
ep
)
{
bool
changed
=
false
;
if
(
dnodeId
==
0
)
return
changed
;
taosRLockLatch
(
&
p
Dnode
->
data
.
latch
);
taosRLockLatch
(
&
p
Mgmt
->
data
.
latch
);
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
p
Dnode
->
data
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
p
Mgmt
->
data
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
pDnodeEp
!=
NULL
)
{
char
epstr
[
TSDB_EP_LEN
+
1
]
=
{
0
};
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
pDnodeEp
->
ep
.
fqdn
,
pDnodeEp
->
ep
.
port
);
...
...
@@ -304,6 +303,6 @@ static bool dmIsEpChanged(SDnode *pDnode, int32_t dnodeId, const char *ep) {
}
}
taosRUnLockLatch
(
&
p
Dnode
->
data
.
latch
);
taosRUnLockLatch
(
&
p
Mgmt
->
data
.
latch
);
return
changed
;
}
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
0 → 100644
浏览文件 @
add51b49
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
static
void
dmUpdateDnodeCfg
(
SDnodeMgmt
*
pMgmt
,
SDnodeCfg
*
pCfg
)
{
if
(
pMgmt
->
data
.
dnodeId
==
0
||
pMgmt
->
data
.
clusterId
==
0
)
{
dInfo
(
"set dnodeId:%d clusterId:%"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
taosWLockLatch
(
&
pMgmt
->
data
.
latch
);
pMgmt
->
data
.
dnodeId
=
pCfg
->
dnodeId
;
pMgmt
->
data
.
clusterId
=
pCfg
->
clusterId
;
dmWriteEps
(
pMgmt
);
taosWUnLockLatch
(
&
pMgmt
->
data
.
latch
);
}
}
static
int32_t
dmProcessStatusRsp
(
SDnodeMgmt
*
pMgmt
,
SRpcMsg
*
pRsp
)
{
if
(
pRsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRsp
->
code
==
TSDB_CODE_MND_DNODE_NOT_EXIST
&&
!
pMgmt
->
data
.
dropped
&&
pMgmt
->
data
.
dnodeId
>
0
)
{
dInfo
(
"dnode:%d, set to dropped since not exist in mnode"
,
pMgmt
->
data
.
dnodeId
);
pMgmt
->
data
.
dropped
=
1
;
dmWriteEps
(
pMgmt
);
}
}
else
{
SStatusRsp
statusRsp
=
{
0
};
if
(
pRsp
->
pCont
!=
NULL
&&
pRsp
->
contLen
>
0
&&
tDeserializeSStatusRsp
(
pRsp
->
pCont
,
pRsp
->
contLen
,
&
statusRsp
)
==
0
)
{
pMgmt
->
data
.
dnodeVer
=
statusRsp
.
dnodeVer
;
dmUpdateDnodeCfg
(
pMgmt
,
&
statusRsp
.
dnodeCfg
);
dmUpdateEps
(
pMgmt
,
statusRsp
.
pDnodeEps
);
}
rpcFreeCont
(
pRsp
->
pCont
);
tFreeSStatusRsp
(
&
statusRsp
);
}
return
TSDB_CODE_SUCCESS
;
}
void
dmSendStatusReq
(
SDnodeMgmt
*
pMgmt
)
{
SStatusReq
req
=
{
0
};
taosRLockLatch
(
&
pMgmt
->
data
.
latch
);
req
.
sver
=
tsVersion
;
req
.
dnodeVer
=
pMgmt
->
data
.
dnodeVer
;
req
.
dnodeId
=
pMgmt
->
data
.
dnodeId
;
req
.
clusterId
=
pMgmt
->
data
.
clusterId
;
if
(
req
.
clusterId
==
0
)
req
.
dnodeId
=
0
;
req
.
rebootTime
=
pMgmt
->
data
.
rebootTime
;
req
.
updateTime
=
pMgmt
->
data
.
updateTime
;
req
.
numOfCores
=
tsNumOfCores
;
req
.
numOfSupportVnodes
=
pMgmt
->
data
.
supportVnodes
;
tstrncpy
(
req
.
dnodeEp
,
pMgmt
->
data
.
localEp
,
TSDB_EP_LEN
);
req
.
clusterCfg
.
statusInterval
=
tsStatusInterval
;
req
.
clusterCfg
.
checkTime
=
0
;
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
req
.
clusterCfg
.
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
memcpy
(
req
.
clusterCfg
.
timezone
,
tsTimezoneStr
,
TD_TIMEZONE_LEN
);
memcpy
(
req
.
clusterCfg
.
locale
,
tsLocale
,
TD_LOCALE_LEN
);
memcpy
(
req
.
clusterCfg
.
charset
,
tsCharset
,
TD_LOCALE_LEN
);
taosRUnLockLatch
(
&
pMgmt
->
data
.
latch
);
SMonVloadInfo
vinfo
=
{
0
};
dmGetVnodeLoads
(
pMgmt
,
&
vinfo
);
req
.
pVloads
=
vinfo
.
pVloads
;
pMgmt
->
data
.
unsyncedVgId
=
0
;
pMgmt
->
data
.
vndState
=
TAOS_SYNC_STATE_LEADER
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
req
.
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
req
.
pVloads
,
i
);
if
(
pLoad
->
syncState
!=
TAOS_SYNC_STATE_LEADER
&&
pLoad
->
syncState
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
pMgmt
->
data
.
unsyncedVgId
=
pLoad
->
vgId
;
pMgmt
->
data
.
vndState
=
pLoad
->
syncState
;
}
}
SMonMloadInfo
minfo
=
{
0
};
dmGetMnodeLoads
(
pMgmt
,
&
minfo
);
pMgmt
->
data
.
isMnode
=
minfo
.
isMnode
;
pMgmt
->
data
.
mndState
=
minfo
.
load
.
syncState
;
int32_t
contLen
=
tSerializeSStatusReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
tSerializeSStatusReq
(
pHead
,
contLen
,
&
req
);
tFreeSStatusReq
(
&
req
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pHead
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_STATUS
,
.
ahandle
=
(
void
*
)
0x9527
};
SRpcMsg
rpcRsp
=
{
0
};
dTrace
(
"send req:%s to mnode, app:%p"
,
TMSG_INFO
(
rpcMsg
.
msgType
),
rpcMsg
.
ahandle
);
tmsgSendMnodeRecv
(
&
rpcMsg
,
&
rpcRsp
);
dmProcessStatusRsp
(
pMgmt
,
&
rpcRsp
);
}
int32_t
dmProcessAuthRsp
(
SDnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pRsp
=
&
pMsg
->
rpcMsg
;
dError
(
"auth rsp is received, but not supported yet"
);
return
0
;
}
int32_t
dmProcessGrantRsp
(
SDnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pRsp
=
&
pMsg
->
rpcMsg
;
dError
(
"grant rsp is received, but not supported yet"
);
return
0
;
}
int32_t
dmProcessConfigReq
(
SDnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pReq
=
&
pMsg
->
rpcMsg
;
SDCfgDnodeReq
*
pCfg
=
pReq
->
pCont
;
dError
(
"config req is received, but not supported yet"
);
return
TSDB_CODE_OPS_NOT_SUPPORT
;
}
SArray
*
dmGetMsgHandles
()
{
int32_t
code
=
-
1
;
SArray
*
pArray
=
taosArrayInit
(
16
,
sizeof
(
SMgmtHandle
));
if
(
pArray
==
NULL
)
goto
_OVER
;
// Requests handled by DNODE
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_MNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_MNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_QNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_QNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_SNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_SNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_BNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_BNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CONFIG_DNODE
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
// Requests handled by MNODE
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GRANT_RSP
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_AUTH_RSP
,
dmPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
code
=
0
;
_OVER:
if
(
code
!=
0
)
{
taosArrayDestroy
(
pArray
);
return
NULL
;
}
else
{
return
pArray
;
}
}
source/dnode/mgmt/mgmt_dnode/src/dmInt.c
0 → 100644
浏览文件 @
add51b49
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
static
int32_t
dmStartMgmt
(
SDnodeMgmt
*
pMgmt
)
{
if
(
dmStartStatusThread
(
pMgmt
)
!=
0
)
{
return
-
1
;
}
if
(
dmStartMonitorThread
(
pMgmt
)
!=
0
)
{
return
-
1
;
}
return
0
;
}
static
void
dmStopMgmt
(
SDnodeMgmt
*
pMgmt
)
{
dmStopMonitorThread
(
pMgmt
);
dmStopStatusThread
(
pMgmt
);
}
static
int32_t
dmOpenMgmt
(
const
SMgmtInputOpt
*
pInput
,
SMgmtOutputOpt
*
pOutput
)
{
dInfo
(
"dnode-mgmt start to init"
);
SDnodeMgmt
*
pMgmt
=
taosMemoryCalloc
(
1
,
sizeof
(
SDnodeMgmt
));
if
(
pMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pMgmt
->
data
.
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
if
(
pMgmt
->
data
.
dnodeHash
==
NULL
)
{
dError
(
"failed to init dnode hash"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
dmReadEps
(
pMgmt
)
!=
0
)
{
dError
(
"failed to read file since %s"
,
terrstr
());
return
-
1
;
}
if
(
pMgmt
->
data
.
dropped
)
{
dError
(
"dnode will not start since its already dropped"
);
return
-
1
;
}
if
(
dmStartWorker
(
pMgmt
)
!=
0
)
{
return
-
1
;
}
if
(
udfStartUdfd
(
pMgmt
->
data
.
dnodeId
)
!=
0
)
{
dError
(
"failed to start udfd"
);
}
dInfo
(
"dnode-mgmt is initialized"
);
return
0
;
}
static
void
dmCloseMgmt
(
SDnodeMgmt
*
pMgmt
)
{
dInfo
(
"dnode-mgmt start to clean up"
);
dmStopWorker
(
pMgmt
);
taosWLockLatch
(
&
pMgmt
->
data
.
latch
);
if
(
pMgmt
->
data
.
dnodeEps
!=
NULL
)
{
taosArrayDestroy
(
pMgmt
->
data
.
dnodeEps
);
pMgmt
->
data
.
dnodeEps
=
NULL
;
}
if
(
pMgmt
->
data
.
dnodeHash
!=
NULL
)
{
taosHashCleanup
(
pMgmt
->
data
.
dnodeHash
);
pMgmt
->
data
.
dnodeHash
=
NULL
;
}
taosWUnLockLatch
(
&
pMgmt
->
data
.
latch
);
dInfo
(
"dnode-mgmt is cleaned up"
);
}
static
int32_t
dmRequireMgmt
(
const
SMgmtInputOpt
*
pInput
,
bool
*
required
)
{
*
required
=
true
;
return
0
;
}
SMgmtFunc
dmGetMgmtFunc
()
{
SMgmtFunc
mgmtFunc
=
{
0
};
mgmtFunc
.
openFp
=
dmOpenMgmt
;
mgmtFunc
.
closeFp
=
(
NodeCloseFp
)
dmCloseMgmt
;
mgmtFunc
.
startFp
=
(
NodeStartFp
)
dmStartMgmt
;
mgmtFunc
.
stopFp
=
(
NodeStopFp
)
dmStopMgmt
;
mgmtFunc
.
requiredFp
=
dmRequireMgmt
;
mgmtFunc
.
getHandlesFp
=
dmGetMsgHandles
;
return
mgmtFunc
;
}
source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c
0 → 100644
浏览文件 @
add51b49
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \
{ \
SRpcMsg rsp = {0}; \
SRpcMsg req = {.msgType = mtype}; \
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
tstrncpy(epset.eps[0].fqdn, pMgmt->data.localFqdn, TSDB_FQDN_LEN); \
epset.eps[0].port = pMgmt->data.serverPort; \
\
rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \
if (rsp.code == 0 && rsp.contLen > 0) { \
func(rsp.pCont, rsp.contLen, pInfo); \
} \
rpcFreeCont(rsp.pCont); \
}
static
void
dmGetMonitorBasicInfo
(
SDnodeMgmt
*
pMgmt
,
SMonBasicInfo
*
pInfo
)
{
pInfo
->
protocol
=
1
;
pInfo
->
dnode_id
=
pMgmt
->
data
.
dnodeId
;
pInfo
->
cluster_id
=
pMgmt
->
data
.
clusterId
;
tstrncpy
(
pInfo
->
dnode_ep
,
tsLocalEp
,
TSDB_EP_LEN
);
}
static
void
dmGetMonitorDnodeInfo
(
SDnodeMgmt
*
pMgmt
,
SMonDnodeInfo
*
pInfo
)
{
pInfo
->
uptime
=
(
taosGetTimestampMs
()
-
pMgmt
->
data
.
rebootTime
)
/
(
86400000
.
0
f
);
pInfo
->
has_mnode
=
(
*
pMgmt
->
isNodeDeployedFp
)(
pMgmt
->
pDnode
,
MNODE
);
pInfo
->
has_qnode
=
(
*
pMgmt
->
isNodeDeployedFp
)(
pMgmt
->
pDnode
,
QNODE
);
pInfo
->
has_snode
=
(
*
pMgmt
->
isNodeDeployedFp
)(
pMgmt
->
pDnode
,
SNODE
);
pInfo
->
has_bnode
=
(
*
pMgmt
->
isNodeDeployedFp
)(
pMgmt
->
pDnode
,
BNODE
);
tstrncpy
(
pInfo
->
logdir
.
name
,
tsLogDir
,
sizeof
(
pInfo
->
logdir
.
name
));
pInfo
->
logdir
.
size
=
tsLogSpace
.
size
;
tstrncpy
(
pInfo
->
tempdir
.
name
,
tsTempDir
,
sizeof
(
pInfo
->
tempdir
.
name
));
pInfo
->
tempdir
.
size
=
tsTempSpace
.
size
;
}
static
void
dmGetMonitorInfo
(
SDnodeMgmt
*
pMgmt
,
SMonDmInfo
*
pInfo
)
{
dmGetMonitorBasicInfo
(
pMgmt
,
&
pInfo
->
basic
);
dmGetMonitorDnodeInfo
(
pMgmt
,
&
pInfo
->
dnode
);
dmGetMonitorSystemInfo
(
&
pInfo
->
sys
);
}
void
dmSendMonitorReport
(
SDnodeMgmt
*
pMgmt
)
{
if
(
!
tsEnableMonitor
||
tsMonitorFqdn
[
0
]
==
0
||
tsMonitorPort
==
0
)
return
;
dTrace
(
"send monitor report to %s:%u"
,
tsMonitorFqdn
,
tsMonitorPort
);
SMonDmInfo
dmInfo
=
{
0
};
SMonMmInfo
mmInfo
=
{
0
};
SMonVmInfo
vmInfo
=
{
0
};
SMonQmInfo
qmInfo
=
{
0
};
SMonSmInfo
smInfo
=
{
0
};
SMonBmInfo
bmInfo
=
{
0
};
dmGetMonitorInfo
(
pMgmt
,
&
dmInfo
);
dmSendLocalRecv
(
pMgmt
,
TDMT_MON_VM_INFO
,
tDeserializeSMonVmInfo
,
&
vmInfo
);
if
(
dmInfo
.
dnode
.
has_mnode
)
{
dmSendLocalRecv
(
pMgmt
,
TDMT_MON_MM_INFO
,
tDeserializeSMonMmInfo
,
&
mmInfo
);
}
if
(
dmInfo
.
dnode
.
has_qnode
)
{
dmSendLocalRecv
(
pMgmt
,
TDMT_MON_QM_INFO
,
tDeserializeSMonQmInfo
,
&
qmInfo
);
}
if
(
dmInfo
.
dnode
.
has_snode
)
{
dmSendLocalRecv
(
pMgmt
,
TDMT_MON_SM_INFO
,
tDeserializeSMonSmInfo
,
&
smInfo
);
}
if
(
dmInfo
.
dnode
.
has_bnode
)
{
dmSendLocalRecv
(
pMgmt
,
TDMT_MON_BM_INFO
,
tDeserializeSMonBmInfo
,
&
bmInfo
);
}
monSetDmInfo
(
&
dmInfo
);
monSetMmInfo
(
&
mmInfo
);
monSetVmInfo
(
&
vmInfo
);
monSetQmInfo
(
&
qmInfo
);
monSetSmInfo
(
&
smInfo
);
monSetBmInfo
(
&
bmInfo
);
tFreeSMonMmInfo
(
&
mmInfo
);
tFreeSMonVmInfo
(
&
vmInfo
);
tFreeSMonQmInfo
(
&
qmInfo
);
tFreeSMonSmInfo
(
&
smInfo
);
tFreeSMonBmInfo
(
&
bmInfo
);
monSendReport
();
}
void
dmGetVnodeLoads
(
SDnodeMgmt
*
pMgmt
,
SMonVloadInfo
*
pInfo
)
{
dmSendLocalRecv
(
pMgmt
,
TDMT_MON_VM_LOAD
,
tDeserializeSMonVloadInfo
,
pInfo
);
}
void
dmGetMnodeLoads
(
SDnodeMgmt
*
pMgmt
,
SMonMloadInfo
*
pInfo
)
{
dmSendLocalRecv
(
pMgmt
,
TDMT_MON_MM_LOAD
,
tDeserializeSMonMloadInfo
,
pInfo
);
}
source/dnode/mgmt/
node_mgmt
/src/dmWorker.c
→
source/dnode/mgmt/
mgmt_dnode
/src/dmWorker.c
浏览文件 @
add51b49
...
...
@@ -14,11 +14,11 @@
*/
#define _DEFAULT_SOURCE
#include "dmI
mp
.h"
#include "dmI
nt
.h"
static
void
*
dmStatusThreadFp
(
void
*
param
)
{
SDnode
*
pDnode
=
param
;
int64_t
lastTime
=
taosGetTimestampMs
();
SDnode
Mgmt
*
pMgmt
=
param
;
int64_t
lastTime
=
taosGetTimestampMs
();
setThreadName
(
"dnode-status"
);
...
...
@@ -26,14 +26,14 @@ static void *dmStatusThreadFp(void *param) {
taosThreadTestCancel
();
taosMsleep
(
200
);
if
(
p
Dnode
->
status
!=
DND_STAT_RUNNING
||
pDnode
->
data
.
dropped
)
{
if
(
p
Mgmt
->
data
.
status
!=
DND_STAT_RUNNING
||
pMgmt
->
data
.
dropped
)
{
continue
;
}
int64_t
curTime
=
taosGetTimestampMs
();
float
interval
=
(
curTime
-
lastTime
)
/
1000
.
0
f
;
if
(
interval
>=
tsStatusInterval
)
{
dmSendStatusReq
(
p
Dnode
);
dmSendStatusReq
(
p
Mgmt
);
lastTime
=
curTime
;
}
}
...
...
@@ -42,8 +42,8 @@ static void *dmStatusThreadFp(void *param) {
}
static
void
*
dmMonitorThreadFp
(
void
*
param
)
{
SDnode
*
pDnode
=
param
;
int64_t
lastTime
=
taosGetTimestampMs
();
SDnode
Mgmt
*
pMgmt
=
param
;
int64_t
lastTime
=
taosGetTimestampMs
();
setThreadName
(
"dnode-monitor"
);
...
...
@@ -51,14 +51,14 @@ static void *dmMonitorThreadFp(void *param) {
taosThreadTestCancel
();
taosMsleep
(
200
);
if
(
p
Dnode
->
status
!=
DND_STAT_RUNNING
||
pDnode
->
data
.
dropped
)
{
if
(
p
Mgmt
->
data
.
status
!=
DND_STAT_RUNNING
||
pMgmt
->
data
.
dropped
)
{
continue
;
}
int64_t
curTime
=
taosGetTimestampMs
();
float
interval
=
(
curTime
-
lastTime
)
/
1000
.
0
f
;
if
(
interval
>=
tsMonitorInterval
)
{
dmSendMonitorReport
(
p
Dnode
);
dmSendMonitorReport
(
p
Mgmt
);
lastTime
=
curTime
;
}
}
...
...
@@ -66,46 +66,46 @@ static void *dmMonitorThreadFp(void *param) {
return
NULL
;
}
int32_t
dmStartStatusThread
(
SDnode
*
pDnode
)
{
p
Dnode
->
data
.
statusThreadId
=
taosCreateThread
(
dmStatusThreadFp
,
pDnode
);
if
(
p
Dnode
->
data
.
statusThreadId
==
NULL
)
{
int32_t
dmStartStatusThread
(
SDnode
Mgmt
*
pMgmt
)
{
p
Mgmt
->
statusThreadId
=
taosCreateThread
(
dmStatusThreadFp
,
pMgmt
);
if
(
p
Mgmt
->
statusThreadId
==
NULL
)
{
dError
(
"failed to init dnode status thread"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
dmReportStartup
(
pDnode
,
"dnode-status"
,
"initialized"
);
tmsgReportStartup
(
"dnode-status"
,
"initialized"
);
return
0
;
}
void
dmStopStatusThread
(
SDnode
*
pDnode
)
{
if
(
p
Dnode
->
data
.
statusThreadId
!=
NULL
)
{
taosDestoryThread
(
p
Dnode
->
data
.
statusThreadId
);
p
Dnode
->
data
.
statusThreadId
=
NULL
;
void
dmStopStatusThread
(
SDnode
Mgmt
*
pMgmt
)
{
if
(
p
Mgmt
->
statusThreadId
!=
NULL
)
{
taosDestoryThread
(
p
Mgmt
->
statusThreadId
);
p
Mgmt
->
statusThreadId
=
NULL
;
}
}
int32_t
dmStartMonitorThread
(
SDnode
*
pDnode
)
{
p
Dnode
->
data
.
monitorThreadId
=
taosCreateThread
(
dmMonitorThreadFp
,
pDnode
);
if
(
p
Dnode
->
data
.
monitorThreadId
==
NULL
)
{
int32_t
dmStartMonitorThread
(
SDnode
Mgmt
*
pMgmt
)
{
p
Mgmt
->
monitorThreadId
=
taosCreateThread
(
dmMonitorThreadFp
,
pMgmt
);
if
(
p
Mgmt
->
monitorThreadId
==
NULL
)
{
dError
(
"failed to init dnode monitor thread"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
dmReportStartup
(
pDnode
,
"dnode-monitor"
,
"initialized"
);
tmsgReportStartup
(
"dnode-monitor"
,
"initialized"
);
return
0
;
}
void
dmStopMonitorThread
(
SDnode
*
pDnode
)
{
if
(
p
Dnode
->
data
.
monitorThreadId
!=
NULL
)
{
taosDestoryThread
(
p
Dnode
->
data
.
monitorThreadId
);
p
Dnode
->
data
.
monitorThreadId
=
NULL
;
void
dmStopMonitorThread
(
SDnode
Mgmt
*
pMgmt
)
{
if
(
p
Mgmt
->
monitorThreadId
!=
NULL
)
{
taosDestoryThread
(
p
Mgmt
->
monitorThreadId
);
p
Mgmt
->
monitorThreadId
=
NULL
;
}
}
static
void
dmProcessMgmtQueue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
SDnode
*
pDnode
=
pInfo
->
ahandle
;
SDnode
Mgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
tmsg_t
msgType
=
pMsg
->
rpcMsg
.
msgType
;
...
...
@@ -113,37 +113,37 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
switch
(
msgType
)
{
case
TDMT_DND_CONFIG_DNODE
:
code
=
dmProcessConfigReq
(
p
Dnode
,
pMsg
);
code
=
dmProcessConfigReq
(
p
Mgmt
,
pMsg
);
break
;
case
TDMT_MND_AUTH_RSP
:
code
=
dmProcessAuthRsp
(
p
Dnode
,
pMsg
);
code
=
dmProcessAuthRsp
(
p
Mgmt
,
pMsg
);
break
;
case
TDMT_MND_GRANT_RSP
:
code
=
dmProcessGrantRsp
(
p
Dnode
,
pMsg
);
code
=
dmProcessGrantRsp
(
p
Mgmt
,
pMsg
);
break
;
case
TDMT_DND_CREATE_MNODE
:
code
=
dmProcessCreateNodeReq
(
pDnode
,
MNODE
,
pMsg
);
code
=
(
*
pMgmt
->
processCreateNodeFp
)(
pMgmt
->
pDnode
,
MNODE
,
pMsg
);
break
;
case
TDMT_DND_DROP_MNODE
:
code
=
dmProcessDropNodeReq
(
pDnode
,
MNODE
,
pMsg
);
code
=
(
*
pMgmt
->
processDropNodeFp
)(
pMgmt
->
pDnode
,
MNODE
,
pMsg
);
break
;
case
TDMT_DND_CREATE_QNODE
:
code
=
dmProcessCreateNodeReq
(
pDnode
,
QNODE
,
pMsg
);
code
=
(
*
pMgmt
->
processCreateNodeFp
)(
pMgmt
->
pDnode
,
QNODE
,
pMsg
);
break
;
case
TDMT_DND_DROP_QNODE
:
code
=
dmProcessDropNodeReq
(
pDnode
,
QNODE
,
pMsg
);
code
=
(
*
pMgmt
->
processDropNodeFp
)(
pMgmt
->
pDnode
,
QNODE
,
pMsg
);
break
;
case
TDMT_DND_CREATE_SNODE
:
code
=
dmProcessCreateNodeReq
(
pDnode
,
SNODE
,
pMsg
);
code
=
(
*
pMgmt
->
processCreateNodeFp
)(
pMgmt
->
pDnode
,
SNODE
,
pMsg
);
break
;
case
TDMT_DND_DROP_SNODE
:
code
=
dmProcessDropNodeReq
(
pDnode
,
SNODE
,
pMsg
);
code
=
(
*
pMgmt
->
processDropNodeFp
)(
pMgmt
->
pDnode
,
SNODE
,
pMsg
);
break
;
case
TDMT_DND_CREATE_BNODE
:
code
=
dmProcessCreateNodeReq
(
pDnode
,
BNODE
,
pMsg
);
code
=
(
*
pMgmt
->
processCreateNodeFp
)(
pMgmt
->
pDnode
,
BNODE
,
pMsg
);
break
;
case
TDMT_DND_DROP_BNODE
:
code
=
dmProcessDropNodeReq
(
pDnode
,
BNODE
,
pMsg
);
code
=
(
*
pMgmt
->
processDropNodeFp
)(
pMgmt
->
pDnode
,
BNODE
,
pMsg
);
break
;
default:
break
;
...
...
@@ -165,15 +165,15 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem
(
pMsg
);
}
int32_t
dmStartWorker
(
SDnode
*
pDnode
)
{
int32_t
dmStartWorker
(
SDnode
Mgmt
*
pMgmt
)
{
SSingleWorkerCfg
cfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"dnode-mgmt"
,
.
fp
=
(
FItem
)
dmProcessMgmtQueue
,
.
param
=
p
Dnode
,
.
param
=
p
Mgmt
,
};
if
(
tSingleWorkerInit
(
&
p
Dnode
->
data
.
mgmtWorker
,
&
cfg
)
!=
0
)
{
if
(
tSingleWorkerInit
(
&
p
Mgmt
->
mgmtWorker
,
&
cfg
)
!=
0
)
{
dError
(
"failed to start dnode-mgmt worker since %s"
,
terrstr
());
return
-
1
;
}
...
...
@@ -182,13 +182,13 @@ int32_t dmStartWorker(SDnode *pDnode) {
return
0
;
}
void
dmStopWorker
(
SDnode
*
pDnode
)
{
tSingleWorkerCleanup
(
&
p
Dnode
->
data
.
mgmtWorker
);
void
dmStopWorker
(
SDnode
Mgmt
*
pMgmt
)
{
tSingleWorkerCleanup
(
&
p
Mgmt
->
mgmtWorker
);
dDebug
(
"dnode workers are closed"
);
}
int32_t
dmP
rocessMgmtMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SSingleWorker
*
pWorker
=
&
p
Wrapper
->
pDnode
->
data
.
mgmtWorker
;
int32_t
dmP
utNodeMsgToMgmtQueue
(
SDnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SSingleWorker
*
pWorker
=
&
p
Mgmt
->
mgmtWorker
;
dTrace
(
"msg:%p, put into worker %s"
,
pMsg
,
pWorker
->
name
);
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
return
0
;
...
...
source/dnode/mgmt/node_mgmt/CMakeLists.txt
浏览文件 @
add51b49
aux_source_directory
(
src IMPLEMENT_SRC
)
add_library
(
dnode STATIC
${
IMPLEMENT_SRC
}
)
target_link_libraries
(
dnode mgmt_bnode mgmt_mnode mgmt_qnode mgmt_snode mgmt_vnode
dnode mgmt_bnode mgmt_mnode mgmt_qnode mgmt_snode mgmt_vnode
mgmt_dnode
)
target_include_directories
(
dnode
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
\ No newline at end of file
)
source/dnode/mgmt/node_mgmt/src/dmHandle.c
已删除
100644 → 0
浏览文件 @
65e8316f
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmImp.h"
static
void
dmUpdateDnodeCfg
(
SDnode
*
pDnode
,
SDnodeCfg
*
pCfg
)
{
if
(
pDnode
->
data
.
dnodeId
==
0
||
pDnode
->
data
.
clusterId
==
0
)
{
dInfo
(
"set dnodeId:%d clusterId:%"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
taosWLockLatch
(
&
pDnode
->
data
.
latch
);
pDnode
->
data
.
dnodeId
=
pCfg
->
dnodeId
;
pDnode
->
data
.
clusterId
=
pCfg
->
clusterId
;
dmWriteEps
(
pDnode
);
taosWUnLockLatch
(
&
pDnode
->
data
.
latch
);
}
}
static
int32_t
dmProcessStatusRsp
(
SDnode
*
pDnode
,
SRpcMsg
*
pRsp
)
{
if
(
pRsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRsp
->
code
==
TSDB_CODE_MND_DNODE_NOT_EXIST
&&
!
pDnode
->
data
.
dropped
&&
pDnode
->
data
.
dnodeId
>
0
)
{
dInfo
(
"dnode:%d, set to dropped since not exist in mnode"
,
pDnode
->
data
.
dnodeId
);
pDnode
->
data
.
dropped
=
1
;
dmWriteEps
(
pDnode
);
}
}
else
{
SStatusRsp
statusRsp
=
{
0
};
if
(
pRsp
->
pCont
!=
NULL
&&
pRsp
->
contLen
>
0
&&
tDeserializeSStatusRsp
(
pRsp
->
pCont
,
pRsp
->
contLen
,
&
statusRsp
)
==
0
)
{
pDnode
->
data
.
dnodeVer
=
statusRsp
.
dnodeVer
;
dmUpdateDnodeCfg
(
pDnode
,
&
statusRsp
.
dnodeCfg
);
dmUpdateEps
(
pDnode
,
statusRsp
.
pDnodeEps
);
}
rpcFreeCont
(
pRsp
->
pCont
);
tFreeSStatusRsp
(
&
statusRsp
);
}
return
TSDB_CODE_SUCCESS
;
}
void
dmSendStatusReq
(
SDnode
*
pDnode
)
{
SStatusReq
req
=
{
0
};
taosRLockLatch
(
&
pDnode
->
data
.
latch
);
req
.
sver
=
tsVersion
;
req
.
dnodeVer
=
pDnode
->
data
.
dnodeVer
;
req
.
dnodeId
=
pDnode
->
data
.
dnodeId
;
req
.
clusterId
=
pDnode
->
data
.
clusterId
;
if
(
req
.
clusterId
==
0
)
req
.
dnodeId
=
0
;
req
.
rebootTime
=
pDnode
->
data
.
rebootTime
;
req
.
updateTime
=
pDnode
->
data
.
updateTime
;
req
.
numOfCores
=
tsNumOfCores
;
req
.
numOfSupportVnodes
=
pDnode
->
data
.
supportVnodes
;
tstrncpy
(
req
.
dnodeEp
,
pDnode
->
data
.
localEp
,
TSDB_EP_LEN
);
req
.
clusterCfg
.
statusInterval
=
tsStatusInterval
;
req
.
clusterCfg
.
checkTime
=
0
;
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
req
.
clusterCfg
.
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
memcpy
(
req
.
clusterCfg
.
timezone
,
tsTimezoneStr
,
TD_TIMEZONE_LEN
);
memcpy
(
req
.
clusterCfg
.
locale
,
tsLocale
,
TD_LOCALE_LEN
);
memcpy
(
req
.
clusterCfg
.
charset
,
tsCharset
,
TD_LOCALE_LEN
);
taosRUnLockLatch
(
&
pDnode
->
data
.
latch
);
SMonVloadInfo
vinfo
=
{
0
};
dmGetVnodeLoads
(
pDnode
,
&
vinfo
);
req
.
pVloads
=
vinfo
.
pVloads
;
pDnode
->
data
.
unsyncedVgId
=
0
;
pDnode
->
data
.
vndState
=
TAOS_SYNC_STATE_LEADER
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
req
.
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
req
.
pVloads
,
i
);
if
(
pLoad
->
syncState
!=
TAOS_SYNC_STATE_LEADER
&&
pLoad
->
syncState
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
pDnode
->
data
.
unsyncedVgId
=
pLoad
->
vgId
;
pDnode
->
data
.
vndState
=
pLoad
->
syncState
;
}
}
SMonMloadInfo
minfo
=
{
0
};
dmGetMnodeLoads
(
pDnode
,
&
minfo
);
pDnode
->
data
.
isMnode
=
minfo
.
isMnode
;
pDnode
->
data
.
mndState
=
minfo
.
load
.
syncState
;
int32_t
contLen
=
tSerializeSStatusReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
tSerializeSStatusReq
(
pHead
,
contLen
,
&
req
);
tFreeSStatusReq
(
&
req
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pHead
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_STATUS
,
.
ahandle
=
(
void
*
)
0x9527
};
SRpcMsg
rpcRsp
=
{
0
};
dTrace
(
"send req:%s to mnode, app:%p"
,
TMSG_INFO
(
rpcMsg
.
msgType
),
rpcMsg
.
ahandle
);
dmSendToMnodeRecv
(
pDnode
,
&
rpcMsg
,
&
rpcRsp
);
dmProcessStatusRsp
(
pDnode
,
&
rpcRsp
);
}
int32_t
dmProcessAuthRsp
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pRsp
=
&
pMsg
->
rpcMsg
;
dError
(
"auth rsp is received, but not supported yet"
);
return
0
;
}
int32_t
dmProcessGrantRsp
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pRsp
=
&
pMsg
->
rpcMsg
;
dError
(
"grant rsp is received, but not supported yet"
);
return
0
;
}
int32_t
dmProcessConfigReq
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pReq
=
&
pMsg
->
rpcMsg
;
SDCfgDnodeReq
*
pCfg
=
pReq
->
pCont
;
dError
(
"config req is received, but not supported yet"
);
return
TSDB_CODE_OPS_NOT_SUPPORT
;
}
int32_t
dmProcessCreateNodeReq
(
SDnode
*
pDnode
,
EDndNodeType
ntype
,
SNodeMsg
*
pMsg
)
{
SMgmtWrapper
*
pWrapper
=
dmAcquireWrapper
(
pDnode
,
ntype
);
if
(
pWrapper
!=
NULL
)
{
dmReleaseWrapper
(
pWrapper
);
terrno
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
dError
(
"failed to create node since %s"
,
terrstr
());
return
-
1
;
}
taosThreadMutexLock
(
&
pDnode
->
mutex
);
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
if
(
taosMkDir
(
pWrapper
->
path
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to create dir:%s since %s"
,
pWrapper
->
path
,
terrstr
());
return
-
1
;
}
int32_t
code
=
(
*
pWrapper
->
fp
.
createFp
)(
pWrapper
,
pMsg
);
if
(
code
!=
0
)
{
dError
(
"node:%s, failed to create since %s"
,
pWrapper
->
name
,
terrstr
());
}
else
{
dDebug
(
"node:%s, has been created"
,
pWrapper
->
name
);
(
void
)
dmOpenNode
(
pWrapper
);
pWrapper
->
required
=
true
;
pWrapper
->
deployed
=
true
;
pWrapper
->
procType
=
pDnode
->
ptype
;
}
taosThreadMutexUnlock
(
&
pDnode
->
mutex
);
return
code
;
}
int32_t
dmProcessDropNodeReq
(
SDnode
*
pDnode
,
EDndNodeType
ntype
,
SNodeMsg
*
pMsg
)
{
SMgmtWrapper
*
pWrapper
=
dmAcquireWrapper
(
pDnode
,
ntype
);
if
(
pWrapper
==
NULL
)
{
terrno
=
TSDB_CODE_NODE_NOT_DEPLOYED
;
dError
(
"failed to drop node since %s"
,
terrstr
());
return
-
1
;
}
taosThreadMutexLock
(
&
pDnode
->
mutex
);
int32_t
code
=
(
*
pWrapper
->
fp
.
dropFp
)(
pWrapper
,
pMsg
);
if
(
code
!=
0
)
{
dError
(
"node:%s, failed to drop since %s"
,
pWrapper
->
name
,
terrstr
());
}
else
{
dDebug
(
"node:%s, has been dropped"
,
pWrapper
->
name
);
pWrapper
->
required
=
false
;
pWrapper
->
deployed
=
false
;
}
dmReleaseWrapper
(
pWrapper
);
if
(
code
==
0
)
{
dmCloseNode
(
pWrapper
);
taosRemoveDir
(
pWrapper
->
path
);
}
taosThreadMutexUnlock
(
&
pDnode
->
mutex
);
return
code
;
}
static
void
dmSetMgmtMsgHandle
(
SMgmtWrapper
*
pWrapper
)
{
// Requests handled by DNODE
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_CREATE_MNODE
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_DROP_MNODE
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_CREATE_QNODE
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_DROP_QNODE
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_CREATE_SNODE
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_DROP_SNODE
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_CREATE_BNODE
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_DROP_BNODE
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_CONFIG_DNODE
,
dmProcessMgmtMsg
,
0
);
// Requests handled by MNODE
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_GRANT_RSP
,
dmProcessMgmtMsg
,
0
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_AUTH_RSP
,
dmProcessMgmtMsg
,
0
);
}
static
int32_t
dmStartMgmt
(
SMgmtWrapper
*
pWrapper
)
{
if
(
dmStartStatusThread
(
pWrapper
->
pDnode
)
!=
0
)
{
return
-
1
;
}
if
(
dmStartMonitorThread
(
pWrapper
->
pDnode
)
!=
0
)
{
return
-
1
;
}
return
0
;
}
static
void
dmStopMgmt
(
SMgmtWrapper
*
pWrapper
)
{
dmStopMonitorThread
(
pWrapper
->
pDnode
);
dmStopStatusThread
(
pWrapper
->
pDnode
);
}
static
int32_t
dmInitMgmt
(
SMgmtWrapper
*
pWrapper
)
{
dInfo
(
"dnode-mgmt start to init"
);
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
SMonCfg
monCfg
=
{
0
};
monCfg
.
maxLogs
=
tsMonitorMaxLogs
;
monCfg
.
port
=
tsMonitorPort
;
monCfg
.
server
=
tsMonitorFqdn
;
monCfg
.
comp
=
tsMonitorComp
;
if
(
monInit
(
&
monCfg
)
!=
0
)
{
dError
(
"failed to init monitor since %s"
,
terrstr
());
return
-
1
;
}
pDnode
->
data
.
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
if
(
pDnode
->
data
.
dnodeHash
==
NULL
)
{
dError
(
"failed to init dnode hash"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
dmReadEps
(
pDnode
)
!=
0
)
{
dError
(
"failed to read file since %s"
,
terrstr
());
return
-
1
;
}
if
(
pDnode
->
data
.
dropped
)
{
dError
(
"dnode will not start since its already dropped"
);
return
-
1
;
}
if
(
dmStartWorker
(
pDnode
)
!=
0
)
{
return
-
1
;
}
if
(
dmInitServer
(
pDnode
)
!=
0
)
{
dError
(
"failed to init transport since %s"
,
terrstr
());
return
-
1
;
}
dmReportStartup
(
pDnode
,
"dnode-transport"
,
"initialized"
);
if
(
udfStartUdfd
(
pDnode
->
data
.
dnodeId
)
!=
0
)
{
dError
(
"failed to start udfd"
);
}
dInfo
(
"dnode-mgmt is initialized"
);
return
0
;
}
static
void
dmCleanupMgmt
(
SMgmtWrapper
*
pWrapper
)
{
dInfo
(
"dnode-mgmt start to clean up"
);
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
udfStopUdfd
();
dmStopWorker
(
pDnode
);
taosWLockLatch
(
&
pDnode
->
data
.
latch
);
if
(
pDnode
->
data
.
dnodeEps
!=
NULL
)
{
taosArrayDestroy
(
pDnode
->
data
.
dnodeEps
);
pDnode
->
data
.
dnodeEps
=
NULL
;
}
if
(
pDnode
->
data
.
dnodeHash
!=
NULL
)
{
taosHashCleanup
(
pDnode
->
data
.
dnodeHash
);
pDnode
->
data
.
dnodeHash
=
NULL
;
}
taosWUnLockLatch
(
&
pDnode
->
data
.
latch
);
dmCleanupClient
(
pDnode
);
dmCleanupServer
(
pDnode
);
dInfo
(
"dnode-mgmt is cleaned up"
);
}
static
int32_t
dmRequireMgmt
(
SMgmtWrapper
*
pWrapper
,
bool
*
required
)
{
*
required
=
true
;
return
0
;
}
void
dmInitWrapper
(
SMgmtWrapper
*
pWrapper
)
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
dmInitMgmt
;
mgmtFp
.
closeFp
=
dmCleanupMgmt
;
mgmtFp
.
startFp
=
dmStartMgmt
;
mgmtFp
.
stopFp
=
dmStopMgmt
;
mgmtFp
.
requiredFp
=
dmRequireMgmt
;
dmSetMgmtMsgHandle
(
pWrapper
);
pWrapper
->
name
=
"dnode"
;
pWrapper
->
fp
=
mgmtFp
;
}
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
已删除
100644 → 0
浏览文件 @
65e8316f
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmImp.h"
static
void
dmGetMonitorBasicInfo
(
SDnode
*
pDnode
,
SMonBasicInfo
*
pInfo
)
{
pInfo
->
protocol
=
1
;
pInfo
->
dnode_id
=
pDnode
->
data
.
dnodeId
;
pInfo
->
cluster_id
=
pDnode
->
data
.
clusterId
;
tstrncpy
(
pInfo
->
dnode_ep
,
tsLocalEp
,
TSDB_EP_LEN
);
}
static
void
dmGetMonitorDnodeInfo
(
SDnode
*
pDnode
,
SMonDnodeInfo
*
pInfo
)
{
pInfo
->
uptime
=
(
taosGetTimestampMs
()
-
pDnode
->
data
.
rebootTime
)
/
(
86400000
.
0
f
);
pInfo
->
has_mnode
=
pDnode
->
wrappers
[
MNODE
].
required
;
pInfo
->
has_qnode
=
pDnode
->
wrappers
[
QNODE
].
required
;
pInfo
->
has_snode
=
pDnode
->
wrappers
[
SNODE
].
required
;
pInfo
->
has_bnode
=
pDnode
->
wrappers
[
BNODE
].
required
;
tstrncpy
(
pInfo
->
logdir
.
name
,
tsLogDir
,
sizeof
(
pInfo
->
logdir
.
name
));
pInfo
->
logdir
.
size
=
tsLogSpace
.
size
;
tstrncpy
(
pInfo
->
tempdir
.
name
,
tsTempDir
,
sizeof
(
pInfo
->
tempdir
.
name
));
pInfo
->
tempdir
.
size
=
tsTempSpace
.
size
;
}
static
void
dmGetMonitorInfo
(
SDnode
*
pDnode
,
SMonDmInfo
*
pInfo
)
{
dmGetMonitorBasicInfo
(
pDnode
,
&
pInfo
->
basic
);
dmGetMonitorSysInfo
(
&
pInfo
->
sys
);
dmGetMonitorDnodeInfo
(
pDnode
,
&
pInfo
->
dnode
);
}
void
dmSendMonitorReport
(
SDnode
*
pDnode
)
{
if
(
!
tsEnableMonitor
||
tsMonitorFqdn
[
0
]
==
0
||
tsMonitorPort
==
0
)
return
;
dTrace
(
"send monitor report to %s:%u"
,
tsMonitorFqdn
,
tsMonitorPort
);
SMonDmInfo
dmInfo
=
{
0
};
SMonMmInfo
mmInfo
=
{
0
};
SMonVmInfo
vmInfo
=
{
0
};
SMonQmInfo
qmInfo
=
{
0
};
SMonSmInfo
smInfo
=
{
0
};
SMonBmInfo
bmInfo
=
{
0
};
SRpcMsg
req
=
{
0
};
SRpcMsg
rsp
;
SEpSet
epset
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
tstrncpy
(
epset
.
eps
[
0
].
fqdn
,
pDnode
->
data
.
localFqdn
,
TSDB_FQDN_LEN
);
epset
.
eps
[
0
].
port
=
tsServerPort
;
SMgmtWrapper
*
pWrapper
=
NULL
;
dmGetMonitorInfo
(
pDnode
,
&
dmInfo
);
bool
getFromAPI
=
!
tsMultiProcess
;
pWrapper
=
&
pDnode
->
wrappers
[
MNODE
];
if
(
getFromAPI
)
{
if
(
dmMarkWrapper
(
pWrapper
)
==
0
)
{
mmGetMonitorInfo
(
pWrapper
,
&
mmInfo
);
dmReleaseWrapper
(
pWrapper
);
}
}
else
{
if
(
pWrapper
->
required
)
{
req
.
msgType
=
TDMT_MON_MM_INFO
;
dmSendRecv
(
pDnode
,
&
epset
,
&
req
,
&
rsp
);
if
(
rsp
.
code
==
0
&&
rsp
.
contLen
>
0
)
{
tDeserializeSMonMmInfo
(
rsp
.
pCont
,
rsp
.
contLen
,
&
mmInfo
);
}
rpcFreeCont
(
rsp
.
pCont
);
}
}
pWrapper
=
&
pDnode
->
wrappers
[
VNODE
];
if
(
getFromAPI
)
{
if
(
dmMarkWrapper
(
pWrapper
)
==
0
)
{
vmGetMonitorInfo
(
pWrapper
,
&
vmInfo
);
dmReleaseWrapper
(
pWrapper
);
}
}
else
{
if
(
pWrapper
->
required
)
{
req
.
msgType
=
TDMT_MON_VM_INFO
;
dmSendRecv
(
pDnode
,
&
epset
,
&
req
,
&
rsp
);
if
(
rsp
.
code
==
0
&&
rsp
.
contLen
>
0
)
{
tDeserializeSMonVmInfo
(
rsp
.
pCont
,
rsp
.
contLen
,
&
vmInfo
);
}
rpcFreeCont
(
rsp
.
pCont
);
}
}
pWrapper
=
&
pDnode
->
wrappers
[
QNODE
];
if
(
getFromAPI
)
{
if
(
dmMarkWrapper
(
pWrapper
)
==
0
)
{
qmGetMonitorInfo
(
pWrapper
,
&
qmInfo
);
dmReleaseWrapper
(
pWrapper
);
}
}
else
{
if
(
pWrapper
->
required
)
{
req
.
msgType
=
TDMT_MON_QM_INFO
;
dmSendRecv
(
pDnode
,
&
epset
,
&
req
,
&
rsp
);
if
(
rsp
.
code
==
0
&&
rsp
.
contLen
>
0
)
{
tDeserializeSMonQmInfo
(
rsp
.
pCont
,
rsp
.
contLen
,
&
qmInfo
);
}
rpcFreeCont
(
rsp
.
pCont
);
}
}
pWrapper
=
&
pDnode
->
wrappers
[
SNODE
];
if
(
getFromAPI
)
{
if
(
dmMarkWrapper
(
pWrapper
)
==
0
)
{
smGetMonitorInfo
(
pWrapper
,
&
smInfo
);
dmReleaseWrapper
(
pWrapper
);
}
}
else
{
if
(
pWrapper
->
required
)
{
req
.
msgType
=
TDMT_MON_SM_INFO
;
dmSendRecv
(
pDnode
,
&
epset
,
&
req
,
&
rsp
);
if
(
rsp
.
code
==
0
&&
rsp
.
contLen
>
0
)
{
tDeserializeSMonSmInfo
(
rsp
.
pCont
,
rsp
.
contLen
,
&
smInfo
);
}
rpcFreeCont
(
rsp
.
pCont
);
}
}
pWrapper
=
&
pDnode
->
wrappers
[
BNODE
];
if
(
getFromAPI
)
{
if
(
dmMarkWrapper
(
pWrapper
)
==
0
)
{
bmGetMonitorInfo
(
pWrapper
,
&
bmInfo
);
dmReleaseWrapper
(
pWrapper
);
}
}
else
{
if
(
pWrapper
->
required
)
{
req
.
msgType
=
TDMT_MON_BM_INFO
;
dmSendRecv
(
pDnode
,
&
epset
,
&
req
,
&
rsp
);
if
(
rsp
.
code
==
0
&&
rsp
.
contLen
>
0
)
{
tDeserializeSMonBmInfo
(
rsp
.
pCont
,
rsp
.
contLen
,
&
bmInfo
);
}
rpcFreeCont
(
rsp
.
pCont
);
}
}
monSetDmInfo
(
&
dmInfo
);
monSetMmInfo
(
&
mmInfo
);
monSetVmInfo
(
&
vmInfo
);
monSetQmInfo
(
&
qmInfo
);
monSetSmInfo
(
&
smInfo
);
monSetBmInfo
(
&
bmInfo
);
tFreeSMonMmInfo
(
&
mmInfo
);
tFreeSMonVmInfo
(
&
vmInfo
);
tFreeSMonQmInfo
(
&
qmInfo
);
tFreeSMonSmInfo
(
&
smInfo
);
tFreeSMonBmInfo
(
&
bmInfo
);
monSendReport
();
}
void
dmGetVnodeLoads
(
SDnode
*
pDnode
,
SMonVloadInfo
*
pInfo
)
{
SMgmtWrapper
*
pWrapper
=
dmAcquireWrapper
(
pDnode
,
VNODE
);
if
(
pWrapper
==
NULL
)
return
;
bool
getFromAPI
=
!
tsMultiProcess
;
if
(
getFromAPI
)
{
vmGetVnodeLoads
(
pWrapper
,
pInfo
);
}
else
{
SRpcMsg
req
=
{.
msgType
=
TDMT_MON_VM_LOAD
};
SRpcMsg
rsp
=
{
0
};
SEpSet
epset
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
tstrncpy
(
epset
.
eps
[
0
].
fqdn
,
pDnode
->
data
.
localFqdn
,
TSDB_FQDN_LEN
);
epset
.
eps
[
0
].
port
=
tsServerPort
;
dmSendRecv
(
pDnode
,
&
epset
,
&
req
,
&
rsp
);
if
(
rsp
.
code
==
0
&&
rsp
.
contLen
>
0
)
{
tDeserializeSMonVloadInfo
(
rsp
.
pCont
,
rsp
.
contLen
,
pInfo
);
}
rpcFreeCont
(
rsp
.
pCont
);
}
dmReleaseWrapper
(
pWrapper
);
}
void
dmGetMnodeLoads
(
SDnode
*
pDnode
,
SMonMloadInfo
*
pInfo
)
{
SMgmtWrapper
*
pWrapper
=
dmAcquireWrapper
(
pDnode
,
MNODE
);
if
(
pWrapper
==
NULL
)
{
pInfo
->
isMnode
=
0
;
return
;
}
bool
getFromAPI
=
!
tsMultiProcess
;
if
(
getFromAPI
)
{
mmGetMnodeLoads
(
pWrapper
,
pInfo
);
}
else
{
SRpcMsg
req
=
{.
msgType
=
TDMT_MON_MM_LOAD
};
SRpcMsg
rsp
=
{
0
};
SEpSet
epset
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
tstrncpy
(
epset
.
eps
[
0
].
fqdn
,
pDnode
->
data
.
localFqdn
,
TSDB_FQDN_LEN
);
epset
.
eps
[
0
].
port
=
tsServerPort
;
dmSendRecv
(
pDnode
,
&
epset
,
&
req
,
&
rsp
);
if
(
rsp
.
code
==
0
&&
rsp
.
contLen
>
0
)
{
tDeserializeSMonMloadInfo
(
rsp
.
pCont
,
rsp
.
contLen
,
pInfo
);
}
rpcFreeCont
(
rsp
.
pCont
);
}
dmReleaseWrapper
(
pWrapper
);
}
source/dnode/mgmt/node_util/inc/dmUtil.h
浏览文件 @
add51b49
...
...
@@ -156,31 +156,32 @@ int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm);
// common define
typedef
struct
{
int32_t
dnodeId
;
int64_t
clusterId
;
int64_t
dnodeVer
;
int64_t
updateTime
;
int64_t
rebootTime
;
int32_t
unsyncedVgId
;
ESyncState
vndState
;
ESyncState
mndState
;
bool
isMnode
;
bool
dropped
;
SEpSet
mnodeEps
;
SArray
*
dnodeEps
;
SHashObj
*
dnodeHash
;
SRWLatch
latch
;
SMsgCb
msgCb
;
TdFilePtr
lockfile
;
char
*
localEp
;
char
*
localFqdn
;
char
*
firstEp
;
char
*
secondEp
;
char
*
dataDir
;
SDiskCfg
*
disks
;
int32_t
numOfDisks
;
int32_t
supportVnodes
;
uint16_t
serverPort
;
int32_t
dnodeId
;
int64_t
clusterId
;
int64_t
dnodeVer
;
int64_t
updateTime
;
int64_t
rebootTime
;
int32_t
unsyncedVgId
;
ESyncState
vndState
;
ESyncState
mndState
;
bool
isMnode
;
bool
dropped
;
SEpSet
mnodeEps
;
SArray
*
dnodeEps
;
SHashObj
*
dnodeHash
;
SRWLatch
latch
;
SMsgCb
msgCb
;
TdFilePtr
lockfile
;
char
*
localEp
;
char
*
localFqdn
;
char
*
firstEp
;
char
*
secondEp
;
char
*
dataDir
;
SDiskCfg
*
disks
;
int32_t
numOfDisks
;
int32_t
supportVnodes
;
uint16_t
serverPort
;
EDndRunStatus
status
;
}
SDnodeData
;
#ifdef __cplusplus
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录