Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4fedc23b
T
TDengine
项目概览
taosdata
/
TDengine
12 个月 前同步成功
通知
1180
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4fedc23b
编写于
4月 12, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact(cluster): node mgmt
上级
0094f301
变更
30
隐藏空白更改
内联
并排
Showing
30 changed file
with
628 addition
and
339 deletion
+628
-339
include/common/tmsg.h
include/common/tmsg.h
+3
-3
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-4
source/dnode/mgmt/CMakeLists.txt
source/dnode/mgmt/CMakeLists.txt
+5
-2
source/dnode/mgmt/bm/bmHandle.c
source/dnode/mgmt/bm/bmHandle.c
+3
-3
source/dnode/mgmt/bm/bmInt.c
source/dnode/mgmt/bm/bmInt.c
+2
-2
source/dnode/mgmt/dm/dmFile.c
source/dnode/mgmt/dm/dmFile.c
+18
-18
source/dnode/mgmt/dm/dmHandle.c
source/dnode/mgmt/dm/dmHandle.c
+24
-24
source/dnode/mgmt/dm/dmInt.c
source/dnode/mgmt/dm/dmInt.c
+11
-12
source/dnode/mgmt/dm/dmMonitor.c
source/dnode/mgmt/dm/dmMonitor.c
+4
-4
source/dnode/mgmt/dm/dmWorker.c
source/dnode/mgmt/dm/dmWorker.c
+8
-8
source/dnode/mgmt/exe/dndMain.c
source/dnode/mgmt/exe/dndMain.c
+5
-5
source/dnode/mgmt/inc/dmInt.h
source/dnode/mgmt/inc/dmInt.h
+12
-27
source/dnode/mgmt/interface/CMakeLists.txt
source/dnode/mgmt/interface/CMakeLists.txt
+10
-0
source/dnode/mgmt/interface/inc/dndDef.h
source/dnode/mgmt/interface/inc/dndDef.h
+153
-0
source/dnode/mgmt/interface/inc/dndInt.h
source/dnode/mgmt/interface/inc/dndInt.h
+86
-0
source/dnode/mgmt/interface/inc/dndLog.h
source/dnode/mgmt/interface/inc/dndLog.h
+36
-0
source/dnode/mgmt/interface/src/dndInt.c
source/dnode/mgmt/interface/src/dndInt.c
+86
-0
source/dnode/mgmt/main/dndEnv.c
source/dnode/mgmt/main/dndEnv.c
+0
-60
source/dnode/mgmt/main/dndExec.c
source/dnode/mgmt/main/dndExec.c
+33
-33
source/dnode/mgmt/main/dndFile.c
source/dnode/mgmt/main/dndFile.c
+19
-19
source/dnode/mgmt/main/dndObj.c
source/dnode/mgmt/main/dndObj.c
+34
-43
source/dnode/mgmt/main/dndTransport.c
source/dnode/mgmt/main/dndTransport.c
+44
-44
source/dnode/mgmt/mm/mmHandle.c
source/dnode/mgmt/mm/mmHandle.c
+4
-4
source/dnode/mgmt/mm/mmInt.c
source/dnode/mgmt/mm/mmInt.c
+10
-10
source/dnode/mgmt/qm/qmHandle.c
source/dnode/mgmt/qm/qmHandle.c
+2
-2
source/dnode/mgmt/qm/qmInt.c
source/dnode/mgmt/qm/qmInt.c
+2
-2
source/dnode/mgmt/sm/smHandle.c
source/dnode/mgmt/sm/smHandle.c
+2
-2
source/dnode/mgmt/sm/smInt.c
source/dnode/mgmt/sm/smInt.c
+2
-2
source/dnode/mgmt/vm/vmInt.c
source/dnode/mgmt/vm/vmInt.c
+4
-4
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-2
未找到文件。
include/common/tmsg.h
浏览文件 @
4fedc23b
...
...
@@ -744,8 +744,8 @@ typedef struct {
}
SVnodeLoad
;
typedef
struct
{
int32_t
sver
;
// software version
int64_t
d
v
er
;
// dnode table version in sdb
int32_t
sver
;
// software version
int64_t
d
nodeV
er
;
// dnode table version in sdb
int32_t
dnodeId
;
int64_t
clusterId
;
int64_t
rebootTime
;
...
...
@@ -772,7 +772,7 @@ typedef struct {
}
SDnodeEp
;
typedef
struct
{
int64_t
d
v
er
;
int64_t
d
nodeV
er
;
SDnodeCfg
dnodeCfg
;
SArray
*
pDnodeEps
;
// Array of SDnodeEp
}
SStatusRsp
;
...
...
source/common/src/tmsg.c
浏览文件 @
4fedc23b
...
...
@@ -868,7 +868,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
// status
if
(
tEncodeI32
(
&
encoder
,
pReq
->
sver
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
d
v
er
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
d
nodeV
er
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
clusterId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
rebootTime
)
<
0
)
return
-
1
;
...
...
@@ -913,7 +913,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
// status
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
sver
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
d
v
er
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
d
nodeV
er
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
clusterId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
rebootTime
)
<
0
)
return
-
1
;
...
...
@@ -965,7 +965,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
// status
if
(
tEncodeI64
(
&
encoder
,
pRsp
->
d
v
er
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pRsp
->
d
nodeV
er
)
<
0
)
return
-
1
;
// dnode cfg
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
dnodeCfg
.
dnodeId
)
<
0
)
return
-
1
;
...
...
@@ -996,7 +996,7 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
// status
if
(
tDecodeI64
(
&
decoder
,
&
pRsp
->
d
v
er
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pRsp
->
d
nodeV
er
)
<
0
)
return
-
1
;
// cluster cfg
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
dnodeCfg
.
dnodeId
)
<
0
)
return
-
1
;
...
...
source/dnode/mgmt/CMakeLists.txt
浏览文件 @
4fedc23b
add_subdirectory
(
interface
)
aux_source_directory
(
dm DNODE_SRC
)
aux_source_directory
(
qm DNODE_SRC
)
aux_source_directory
(
bm DNODE_SRC
)
...
...
@@ -7,19 +9,20 @@ aux_source_directory(mm DNODE_SRC)
aux_source_directory
(
main DNODE_SRC
)
add_library
(
dnode STATIC
${
DNODE_SRC
}
)
target_link_libraries
(
dnode
cjson mnode vnode qnode snode bnode wal sync taos tfs monitor
dnode
dnode_interface
)
target_include_directories
(
dnode
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/dnode/mgmt"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/interface/inc"
)
aux_source_directory
(
exe EXEC_SRC
)
add_executable
(
taosd
${
EXEC_SRC
}
)
target_include_directories
(
taosd
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/in
terface/in
c"
)
target_link_libraries
(
taosd dnode
)
...
...
source/dnode/mgmt/bm/bmHandle.c
浏览文件 @
4fedc23b
...
...
@@ -53,9 +53,9 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
createReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
createReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to create bnode since %s, input:%d cur:%d"
,
terrstr
(),
createReq
.
dnodeId
,
pDnode
->
dnodeId
);
dError
(
"failed to create bnode since %s, input:%d cur:%d"
,
terrstr
(),
createReq
.
dnodeId
,
pDnode
->
d
ata
.
d
nodeId
);
return
-
1
;
}
else
{
return
dndOpenNode
(
pWrapper
);
...
...
@@ -72,7 +72,7 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
dropReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
dropReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to drop bnode since %s"
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/bm/bmInt.c
浏览文件 @
4fedc23b
...
...
@@ -113,8 +113,8 @@ void bmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
bmOpen
;
mgmtFp
.
closeFp
=
bmClose
;
mgmtFp
.
create
Msg
Fp
=
bmProcessCreateReq
;
mgmtFp
.
drop
Msg
Fp
=
bmProcessDropReq
;
mgmtFp
.
createFp
=
bmProcessCreateReq
;
mgmtFp
.
dropFp
=
bmProcessDropReq
;
mgmtFp
.
requiredFp
=
bmRequire
;
bmInitMsgHandle
(
pWrapper
);
...
...
source/dnode/mgmt/dm/dmFile.c
浏览文件 @
4fedc23b
...
...
@@ -16,11 +16,11 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
static
void
dmPrintDnodes
(
SDnode
Mgmt
*
pMgmt
);
static
bool
dmIsEpChanged
(
SDnode
Mgmt
*
pMgmt
,
int32_t
dnodeId
,
const
char
*
ep
);
static
void
dmResetDnodes
(
SDnode
Mgmt
*
pMgmt
,
SArray
*
dnodeEps
);
static
void
dmPrintDnodes
(
SDnode
Data
*
pMgmt
);
static
bool
dmIsEpChanged
(
SDnode
Data
*
pMgmt
,
int32_t
dnodeId
,
const
char
*
ep
);
static
void
dmResetDnodes
(
SDnode
Data
*
pMgmt
,
SArray
*
dnodeEps
);
int32_t
dmReadFile
(
SDnode
Mgmt
*
pMgmt
)
{
int32_t
dmReadFile
(
SDnode
Data
*
pMgmt
)
{
int32_t
code
=
TSDB_CODE_INVALID_JSON_FORMAT
;
int32_t
len
=
0
;
int32_t
maxLen
=
256
*
1024
;
...
...
@@ -62,21 +62,21 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) {
dError
(
"failed to read %s since dnodeId not found"
,
file
);
goto
PRASE_DNODE_OVER
;
}
pDnode
->
dnodeId
=
dnodeId
->
valueint
;
pDnode
->
d
ata
.
d
nodeId
=
dnodeId
->
valueint
;
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterId"
);
if
(
!
clusterId
||
clusterId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since clusterId not found"
,
file
);
goto
PRASE_DNODE_OVER
;
}
pDnode
->
clusterId
=
atoll
(
clusterId
->
valuestring
);
pDnode
->
data
.
clusterId
=
atoll
(
clusterId
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_DNODE_OVER
;
}
pDnode
->
dropped
=
dropped
->
valueint
;
pDnode
->
d
ata
.
d
ropped
=
dropped
->
valueint
;
cJSON
*
dnodes
=
cJSON_GetObjectItem
(
root
,
"dnodes"
);
if
(
!
dnodes
||
dnodes
->
type
!=
cJSON_Array
)
{
...
...
@@ -138,15 +138,15 @@ PRASE_DNODE_OVER:
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
if
(
dmIsEpChanged
(
pMgmt
,
pDnode
->
d
nodeId
,
pDnode
->
localEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
pDnode
->
localEp
,
file
);
if
(
dmIsEpChanged
(
pMgmt
,
pDnode
->
d
ata
.
dnodeId
,
pDnode
->
data
.
localEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
pDnode
->
data
.
localEp
,
file
);
return
-
1
;
}
if
(
taosArrayGetSize
(
pMgmt
->
dnodeEps
)
==
0
)
{
SDnodeEp
dnodeEp
=
{
0
};
dnodeEp
.
isMnode
=
1
;
taosGetFqdnPortFromEp
(
pDnode
->
firstEp
,
&
dnodeEp
.
ep
);
taosGetFqdnPortFromEp
(
pDnode
->
data
.
firstEp
,
&
dnodeEp
.
ep
);
taosArrayPush
(
pMgmt
->
dnodeEps
,
&
dnodeEp
);
}
...
...
@@ -156,7 +156,7 @@ PRASE_DNODE_OVER:
return
code
;
}
int32_t
dmWriteFile
(
SDnode
Mgmt
*
pMgmt
)
{
int32_t
dmWriteFile
(
SDnode
Data
*
pMgmt
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
char
file
[
PATH_MAX
];
...
...
@@ -174,9 +174,9 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
: %d,
\n
"
,
pDnode
->
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
pDnode
->
clusterId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d,
\n
"
,
pDnode
->
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
: %d,
\n
"
,
pDnode
->
d
ata
.
d
nodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
pDnode
->
data
.
clusterId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d,
\n
"
,
pDnode
->
d
ata
.
d
ropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodes
\"
: [{
\n
"
);
int32_t
numOfEps
=
(
int32_t
)
taosArrayGetSize
(
pMgmt
->
dnodeEps
);
...
...
@@ -213,7 +213,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
return
0
;
}
void
dmUpdateDnodeEps
(
SDnode
Mgmt
*
pMgmt
,
SArray
*
dnodeEps
)
{
void
dmUpdateDnodeEps
(
SDnode
Data
*
pMgmt
,
SArray
*
dnodeEps
)
{
int32_t
numOfEps
=
taosArrayGetSize
(
dnodeEps
);
if
(
numOfEps
<=
0
)
return
;
...
...
@@ -234,7 +234,7 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) {
taosWUnLockLatch
(
&
pMgmt
->
latch
);
}
static
void
dmResetDnodes
(
SDnode
Mgmt
*
pMgmt
,
SArray
*
dnodeEps
)
{
static
void
dmResetDnodes
(
SDnode
Data
*
pMgmt
,
SArray
*
dnodeEps
)
{
if
(
pMgmt
->
dnodeEps
!=
dnodeEps
)
{
SArray
*
tmp
=
pMgmt
->
dnodeEps
;
pMgmt
->
dnodeEps
=
taosArrayDup
(
dnodeEps
);
...
...
@@ -265,7 +265,7 @@ static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps) {
dmPrintDnodes
(
pMgmt
);
}
static
void
dmPrintDnodes
(
SDnode
Mgmt
*
pMgmt
)
{
static
void
dmPrintDnodes
(
SDnode
Data
*
pMgmt
)
{
int32_t
numOfEps
=
(
int32_t
)
taosArrayGetSize
(
pMgmt
->
dnodeEps
);
dDebug
(
"print dnode ep list, num:%d"
,
numOfEps
);
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
i
++
)
{
...
...
@@ -274,7 +274,7 @@ static void dmPrintDnodes(SDnodeMgmt *pMgmt) {
}
}
static
bool
dmIsEpChanged
(
SDnode
Mgmt
*
pMgmt
,
int32_t
dnodeId
,
const
char
*
ep
)
{
static
bool
dmIsEpChanged
(
SDnode
Data
*
pMgmt
,
int32_t
dnodeId
,
const
char
*
ep
)
{
bool
changed
=
false
;
taosRLockLatch
(
&
pMgmt
->
latch
);
...
...
source/dnode/mgmt/dm/dmHandle.c
浏览文件 @
4fedc23b
...
...
@@ -16,20 +16,20 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
void
dmSendStatusReq
(
SDnode
Mgmt
*
pMgmt
)
{
void
dmSendStatusReq
(
SDnode
Data
*
pMgmt
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
SStatusReq
req
=
{
0
};
taosRLockLatch
(
&
pMgmt
->
latch
);
req
.
sver
=
tsVersion
;
req
.
d
ver
=
pMgmt
->
dv
er
;
req
.
dnodeId
=
pDnode
->
dnodeId
;
req
.
clusterId
=
pDnode
->
clusterId
;
req
.
rebootTime
=
pDnode
->
rebootTime
;
req
.
d
nodeVer
=
pMgmt
->
dnodeV
er
;
req
.
dnodeId
=
pDnode
->
d
ata
.
d
nodeId
;
req
.
clusterId
=
pDnode
->
data
.
clusterId
;
req
.
rebootTime
=
pDnode
->
data
.
rebootTime
;
req
.
updateTime
=
pMgmt
->
updateTime
;
req
.
numOfCores
=
tsNumOfCores
;
req
.
numOfSupportVnodes
=
pDnode
->
numOfS
upportVnodes
;
tstrncpy
(
req
.
dnodeEp
,
pDnode
->
localEp
,
TSDB_EP_LEN
);
req
.
numOfSupportVnodes
=
pDnode
->
data
.
s
upportVnodes
;
tstrncpy
(
req
.
dnodeEp
,
pDnode
->
data
.
localEp
,
TSDB_EP_LEN
);
req
.
clusterCfg
.
statusInterval
=
tsStatusInterval
;
req
.
clusterCfg
.
checkTime
=
0
;
...
...
@@ -40,7 +40,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy
(
req
.
clusterCfg
.
charset
,
tsCharset
,
TD_LOCALE_LEN
);
taosRUnLockLatch
(
&
pMgmt
->
latch
);
SMgmtWrapper
*
pWrapper
=
dndAcquireWrapper
(
pDnode
,
VNODE
S
);
SMgmtWrapper
*
pWrapper
=
dndAcquireWrapper
(
pDnode
,
VNODE
);
if
(
pWrapper
!=
NULL
)
{
SMonVloadInfo
info
=
{
0
};
dmGetVnodeLoads
(
pWrapper
,
&
info
);
...
...
@@ -62,34 +62,34 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
tmsgSendReq
(
&
pMgmt
->
msgCb
,
&
epSet
,
&
rpcMsg
);
}
static
void
dmUpdateDnodeCfg
(
SDnode
Mgmt
*
pMgmt
,
SDnodeCfg
*
pCfg
)
{
static
void
dmUpdateDnodeCfg
(
SDnode
Data
*
pMgmt
,
SDnodeCfg
*
pCfg
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
if
(
pDnode
->
dnodeId
==
0
)
{
if
(
pDnode
->
d
ata
.
d
nodeId
==
0
)
{
dInfo
(
"set dnodeId:%d clusterId:%"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
taosWLockLatch
(
&
pMgmt
->
latch
);
pDnode
->
dnodeId
=
pCfg
->
dnodeId
;
pDnode
->
clusterId
=
pCfg
->
clusterId
;
pDnode
->
d
ata
.
d
nodeId
=
pCfg
->
dnodeId
;
pDnode
->
data
.
clusterId
=
pCfg
->
clusterId
;
dmWriteFile
(
pMgmt
);
taosWUnLockLatch
(
&
pMgmt
->
latch
);
}
}
int32_t
dmProcessStatusRsp
(
SDnode
Mgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
int32_t
dmProcessStatusRsp
(
SDnode
Data
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
SRpcMsg
*
pRsp
=
&
pMsg
->
rpcMsg
;
if
(
pRsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRsp
->
code
==
TSDB_CODE_MND_DNODE_NOT_EXIST
&&
!
pDnode
->
d
ropped
&&
pDnode
->
dnodeId
>
0
)
{
dInfo
(
"dnode:%d, set to dropped since not exist in mnode"
,
pDnode
->
dnodeId
);
pDnode
->
dropped
=
1
;
if
(
pRsp
->
code
==
TSDB_CODE_MND_DNODE_NOT_EXIST
&&
!
pDnode
->
d
ata
.
dropped
&&
pDnode
->
data
.
dnodeId
>
0
)
{
dInfo
(
"dnode:%d, set to dropped since not exist in mnode"
,
pDnode
->
d
ata
.
d
nodeId
);
pDnode
->
d
ata
.
d
ropped
=
1
;
dmWriteFile
(
pMgmt
);
}
}
else
{
SStatusRsp
statusRsp
=
{
0
};
if
(
pRsp
->
pCont
!=
NULL
&&
pRsp
->
contLen
!=
0
&&
tDeserializeSStatusRsp
(
pRsp
->
pCont
,
pRsp
->
contLen
,
&
statusRsp
)
==
0
)
{
pMgmt
->
d
ver
=
statusRsp
.
dv
er
;
pMgmt
->
d
nodeVer
=
statusRsp
.
dnodeV
er
;
dmUpdateDnodeCfg
(
pMgmt
,
&
statusRsp
.
dnodeCfg
);
dmUpdateDnodeEps
(
pMgmt
,
statusRsp
.
pDnodeEps
);
}
...
...
@@ -100,26 +100,26 @@ int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
dmProcessAuthRsp
(
SDnode
Mgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
int32_t
dmProcessAuthRsp
(
SDnode
Data
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pRsp
=
&
pMsg
->
rpcMsg
;
dError
(
"auth rsp is received, but not supported yet"
);
return
0
;
}
int32_t
dmProcessGrantRsp
(
SDnode
Mgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
int32_t
dmProcessGrantRsp
(
SDnode
Data
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pRsp
=
&
pMsg
->
rpcMsg
;
dError
(
"grant rsp is received, but not supported yet"
);
return
0
;
}
int32_t
dmProcessConfigReq
(
SDnode
Mgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
int32_t
dmProcessConfigReq
(
SDnode
Data
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pReq
=
&
pMsg
->
rpcMsg
;
SDCfgDnodeReq
*
pCfg
=
pReq
->
pCont
;
dError
(
"config req is received, but not supported yet"
);
return
TSDB_CODE_OPS_NOT_SUPPORT
;
}
static
int32_t
dmProcessCreateNodeMsg
(
SDnode
*
pDnode
,
EDndType
ntype
,
SNodeMsg
*
pMsg
)
{
static
int32_t
dmProcessCreateNodeMsg
(
SDnode
*
pDnode
,
EDnd
Node
Type
ntype
,
SNodeMsg
*
pMsg
)
{
SMgmtWrapper
*
pWrapper
=
dndAcquireWrapper
(
pDnode
,
ntype
);
if
(
pWrapper
!=
NULL
)
{
dndReleaseWrapper
(
pWrapper
);
...
...
@@ -136,7 +136,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *
return
-
1
;
}
int32_t
code
=
(
*
pWrapper
->
fp
.
create
Msg
Fp
)(
pWrapper
,
pMsg
);
int32_t
code
=
(
*
pWrapper
->
fp
.
createFp
)(
pWrapper
,
pMsg
);
if
(
code
!=
0
)
{
dError
(
"node:%s, failed to open since %s"
,
pWrapper
->
name
,
terrstr
());
}
else
{
...
...
@@ -147,7 +147,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *
return
code
;
}
static
int32_t
dmProcessDropNodeMsg
(
SDnode
*
pDnode
,
EDndType
ntype
,
SNodeMsg
*
pMsg
)
{
static
int32_t
dmProcessDropNodeMsg
(
SDnode
*
pDnode
,
EDnd
Node
Type
ntype
,
SNodeMsg
*
pMsg
)
{
SMgmtWrapper
*
pWrapper
=
dndAcquireWrapper
(
pDnode
,
ntype
);
if
(
pWrapper
==
NULL
)
{
terrno
=
TSDB_CODE_NODE_NOT_DEPLOYED
;
...
...
@@ -158,7 +158,7 @@ static int32_t dmProcessDropNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pM
taosWLockLatch
(
&
pWrapper
->
latch
);
pWrapper
->
deployed
=
false
;
int32_t
code
=
(
*
pWrapper
->
fp
.
drop
Msg
Fp
)(
pWrapper
,
pMsg
);
int32_t
code
=
(
*
pWrapper
->
fp
.
dropFp
)(
pWrapper
,
pMsg
);
if
(
code
!=
0
)
{
pWrapper
->
deployed
=
true
;
dError
(
"node:%s, failed to drop since %s"
,
pWrapper
->
name
,
terrstr
());
...
...
source/dnode/mgmt/dm/dmInt.c
浏览文件 @
4fedc23b
...
...
@@ -16,13 +16,13 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
void
dmGetMnodeEpSet
(
SDnode
Mgmt
*
pMgmt
,
SEpSet
*
pEpSet
)
{
void
dmGetMnodeEpSet
(
SDnode
Data
*
pMgmt
,
SEpSet
*
pEpSet
)
{
taosRLockLatch
(
&
pMgmt
->
latch
);
*
pEpSet
=
pMgmt
->
mnodeEpSet
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
}
void
dmUpdateMnodeEpSet
(
SDnode
Mgmt
*
pMgmt
,
SEpSet
*
pEpSet
)
{
void
dmUpdateMnodeEpSet
(
SDnode
Data
*
pMgmt
,
SEpSet
*
pEpSet
)
{
dInfo
(
"mnode is changed, num:%d use:%d"
,
pEpSet
->
numOfEps
,
pEpSet
->
inUse
);
taosWLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -35,7 +35,7 @@ void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet) {
}
void
dmGetDnodeEp
(
SMgmtWrapper
*
pWrapper
,
int32_t
dnodeId
,
char
*
pEp
,
char
*
pFqdn
,
uint16_t
*
pPort
)
{
SDnode
Mgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SDnode
Data
*
pMgmt
=
pWrapper
->
pMgmt
;
taosRLockLatch
(
&
pMgmt
->
latch
);
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
pMgmt
->
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
...
...
@@ -54,7 +54,7 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd
taosRUnLockLatch
(
&
pMgmt
->
latch
);
}
void
dmSendRedirectRsp
(
SDnode
Mgmt
*
pMgmt
,
const
SRpcMsg
*
pReq
)
{
void
dmSendRedirectRsp
(
SDnode
Data
*
pMgmt
,
const
SRpcMsg
*
pReq
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
SEpSet
epSet
=
{
0
};
...
...
@@ -63,7 +63,7 @@ void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pReq) {
dDebug
(
"RPC %p, req is redirected, num:%d use:%d"
,
pReq
->
handle
,
epSet
.
numOfEps
,
epSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
dDebug
(
"mnode index:%d %s:%u"
,
i
,
epSet
.
eps
[
i
].
fqdn
,
epSet
.
eps
[
i
].
port
);
if
(
strcmp
(
epSet
.
eps
[
i
].
fqdn
,
pDnode
->
localFqdn
)
==
0
&&
epSet
.
eps
[
i
].
port
==
pDnode
->
serverPort
)
{
if
(
strcmp
(
epSet
.
eps
[
i
].
fqdn
,
pDnode
->
data
.
localFqdn
)
==
0
&&
epSet
.
eps
[
i
].
port
==
pDnode
->
data
.
serverPort
)
{
epSet
.
inUse
=
(
i
+
1
)
%
epSet
.
numOfEps
;
}
...
...
@@ -80,15 +80,14 @@ static int32_t dmStart(SMgmtWrapper *pWrapper) {
static
int32_t
dmInit
(
SMgmtWrapper
*
pWrapper
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
SDnode
Mgmt
*
pMgmt
=
taosMemoryCalloc
(
1
,
sizeof
(
SDnodeMgmt
));
SDnode
Data
*
pMgmt
=
taosMemoryCalloc
(
1
,
sizeof
(
SDnodeData
));
dInfo
(
"dnode-mgmt start to init"
);
pDnode
->
dnodeId
=
0
;
pDnode
->
dropped
=
0
;
pDnode
->
clusterId
=
0
;
pDnode
->
d
ata
.
d
nodeId
=
0
;
pDnode
->
d
ata
.
d
ropped
=
0
;
pDnode
->
data
.
clusterId
=
0
;
pMgmt
->
path
=
pWrapper
->
path
;
pMgmt
->
pDnode
=
pDnode
;
pMgmt
->
pWrapper
=
pWrapper
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
...
...
@@ -103,7 +102,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) {
return
-
1
;
}
if
(
pDnode
->
dropped
)
{
if
(
pDnode
->
d
ata
.
d
ropped
)
{
dError
(
"dnode will not start since its already dropped"
);
return
-
1
;
}
...
...
@@ -125,7 +124,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) {
}
static
void
dmCleanup
(
SMgmtWrapper
*
pWrapper
)
{
SDnode
Mgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SDnode
Data
*
pMgmt
=
pWrapper
->
pMgmt
;
if
(
pMgmt
==
NULL
)
return
;
dInfo
(
"dnode-mgmt start to clean up"
);
...
...
source/dnode/mgmt/dm/dmMonitor.c
浏览文件 @
4fedc23b
...
...
@@ -18,13 +18,13 @@
static
void
dmGetMonitorBasicInfo
(
SDnode
*
pDnode
,
SMonBasicInfo
*
pInfo
)
{
pInfo
->
protocol
=
1
;
pInfo
->
dnode_id
=
pDnode
->
dnodeId
;
pInfo
->
cluster_id
=
pDnode
->
clusterId
;
pInfo
->
dnode_id
=
pDnode
->
d
ata
.
d
nodeId
;
pInfo
->
cluster_id
=
pDnode
->
data
.
clusterId
;
tstrncpy
(
pInfo
->
dnode_ep
,
tsLocalEp
,
TSDB_EP_LEN
);
}
static
void
dmGetMonitorDnodeInfo
(
SDnode
*
pDnode
,
SMonDnodeInfo
*
pInfo
)
{
pInfo
->
uptime
=
(
taosGetTimestampMs
()
-
pDnode
->
rebootTime
)
/
(
86400000
.
0
f
);
pInfo
->
uptime
=
(
taosGetTimestampMs
()
-
pDnode
->
data
.
rebootTime
)
/
(
86400000
.
0
f
);
pInfo
->
has_mnode
=
pDnode
->
wrappers
[
MNODE
].
required
;
pInfo
->
has_qnode
=
pDnode
->
wrappers
[
QNODE
].
required
;
pInfo
->
has_snode
=
pDnode
->
wrappers
[
SNODE
].
required
;
...
...
@@ -79,7 +79,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
}
}
pWrapper
=
&
pDnode
->
wrappers
[
VNODE
S
];
pWrapper
=
&
pDnode
->
wrappers
[
VNODE
];
if
(
getFromAPI
)
{
if
(
dndMarkWrapper
(
pWrapper
)
==
0
)
{
vmGetMonitorInfo
(
pWrapper
,
&
vmInfo
);
...
...
source/dnode/mgmt/dm/dmWorker.c
浏览文件 @
4fedc23b
...
...
@@ -17,7 +17,7 @@
#include "dmInt.h"
static
void
*
dmThreadRoutine
(
void
*
param
)
{
SDnode
Mgmt
*
pMgmt
=
param
;
SDnode
Data
*
pMgmt
=
param
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
int64_t
lastStatusTime
=
taosGetTimestampMs
();
int64_t
lastMonitorTime
=
lastStatusTime
;
...
...
@@ -27,7 +27,7 @@ static void *dmThreadRoutine(void *param) {
while
(
true
)
{
taosThreadTestCancel
();
taosMsleep
(
200
);
if
(
dndGetStatus
(
pDnode
)
!=
DND_STAT_RUNNING
||
pDnode
->
dropped
)
{
if
(
dndGetStatus
(
pDnode
)
!=
DND_STAT_RUNNING
||
pDnode
->
d
ata
.
d
ropped
)
{
continue
;
}
...
...
@@ -47,7 +47,7 @@ static void *dmThreadRoutine(void *param) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
dmStartThread
(
SDnode
Mgmt
*
pMgmt
)
{
int32_t
dmStartThread
(
SDnode
Data
*
pMgmt
)
{
pMgmt
->
threadId
=
taosCreateThread
(
dmThreadRoutine
,
pMgmt
);
if
(
pMgmt
->
threadId
==
NULL
)
{
dError
(
"failed to init dnode thread"
);
...
...
@@ -59,7 +59,7 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) {
}
static
void
dmProcessQueue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
SDnode
Mgmt
*
pMgmt
=
pInfo
->
ahandle
;
SDnode
Data
*
pMgmt
=
pInfo
->
ahandle
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
...
...
@@ -95,7 +95,7 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem
(
pMsg
);
}
int32_t
dmStartWorker
(
SDnode
Mgmt
*
pMgmt
)
{
int32_t
dmStartWorker
(
SDnode
Data
*
pMgmt
)
{
SSingleWorkerCfg
mcfg
=
{.
min
=
1
,
.
max
=
1
,
.
name
=
"dnode-mgmt"
,
.
fp
=
(
FItem
)
dmProcessQueue
,
.
param
=
pMgmt
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
mgmtWorker
,
&
mcfg
)
!=
0
)
{
dError
(
"failed to start dnode mgmt worker since %s"
,
terrstr
());
...
...
@@ -112,7 +112,7 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
return
0
;
}
void
dmStopWorker
(
SDnode
Mgmt
*
pMgmt
)
{
void
dmStopWorker
(
SDnode
Data
*
pMgmt
)
{
tSingleWorkerCleanup
(
&
pMgmt
->
mgmtWorker
);
tSingleWorkerCleanup
(
&
pMgmt
->
monitorWorker
);
...
...
@@ -124,7 +124,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
}
int32_t
dmProcessMgmtMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SDnode
Mgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SDnode
Data
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
mgmtWorker
;
dTrace
(
"msg:%p, put into worker %s"
,
pMsg
,
pWorker
->
name
);
...
...
@@ -133,7 +133,7 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t
dmProcessMonitorMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SDnode
Mgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SDnode
Data
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
monitorWorker
;
dTrace
(
"msg:%p, put into worker %s"
,
pMsg
,
pWorker
->
name
);
...
...
source/dnode/mgmt/exe/dndMain.c
浏览文件 @
4fedc23b
...
...
@@ -26,7 +26,7 @@ static struct {
char
apolloUrl
[
PATH_MAX
];
SArray
*
pArgs
;
// SConfigPair
SDnode
*
pDnode
;
EDndType
ntype
;
EDnd
Node
Type
ntype
;
}
global
=
{
0
};
static
void
dndStopDnode
(
int
signum
,
void
*
info
,
void
*
ctx
)
{
...
...
@@ -46,7 +46,7 @@ static void dndSetSignalHandle() {
taosSetSignal
(
SIGQUIT
,
dndStopDnode
);
if
(
!
tsMultiProcess
)
{
}
else
if
(
global
.
ntype
==
DNODE
||
global
.
ntype
==
NODE_MAX
)
{
}
else
if
(
global
.
ntype
==
NODE_BEGIN
||
global
.
ntype
==
NODE_END
)
{
taosIgnSignal
(
SIGCHLD
);
}
else
{
taosKillChildOnParentStopped
();
...
...
@@ -72,8 +72,8 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) {
tstrncpy
(
global
.
envFile
,
argv
[
++
i
],
PATH_MAX
);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
global
.
ntype
=
atoi
(
argv
[
++
i
]);
if
(
global
.
ntype
<=
DNODE
||
global
.
ntype
>
NODE_MAX
)
{
printf
(
"'-n' range is [1 - %d], default is 0
\n
"
,
NODE_
MAX
-
1
);
if
(
global
.
ntype
<=
NODE_BEGIN
||
global
.
ntype
>
NODE_END
)
{
printf
(
"'-n' range is [1 - %d], default is 0
\n
"
,
NODE_
END
-
1
);
return
-
1
;
}
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
)
{
...
...
@@ -135,7 +135,7 @@ static int32_t dndInitLog() {
static
void
dndSetProcInfo
(
int32_t
argc
,
char
**
argv
)
{
taosSetProcPath
(
argc
,
argv
);
if
(
global
.
ntype
!=
DNODE
&&
global
.
ntype
!=
NODE_MAX
)
{
if
(
global
.
ntype
!=
NODE_BEGIN
&&
global
.
ntype
!=
NODE_END
)
{
const
char
*
name
=
dndNodeProcStr
(
global
.
ntype
);
taosSetProcName
(
argc
,
argv
,
name
);
}
...
...
source/dnode/mgmt/inc/dmInt.h
浏览文件 @
4fedc23b
...
...
@@ -22,35 +22,20 @@
extern
"C"
{
#endif
typedef
struct
SDnodeMgmt
{
int64_t
dver
;
int64_t
updateTime
;
int8_t
statusSent
;
SEpSet
mnodeEpSet
;
SHashObj
*
dnodeHash
;
SArray
*
dnodeEps
;
TdThread
*
threadId
;
SRWLatch
latch
;
SSingleWorker
mgmtWorker
;
SSingleWorker
monitorWorker
;
SMsgCb
msgCb
;
const
char
*
path
;
SDnode
*
pDnode
;
SMgmtWrapper
*
pWrapper
;
}
SDnodeMgmt
;
// dmFile.c
int32_t
dmReadFile
(
SDnode
Mgmt
*
pMgmt
);
int32_t
dmWriteFile
(
SDnode
Mgmt
*
pMgmt
);
void
dmUpdateDnodeEps
(
SDnode
Mgmt
*
pMgmt
,
SArray
*
pDnodeEps
);
int32_t
dmReadFile
(
SDnode
Data
*
pMgmt
);
int32_t
dmWriteFile
(
SDnode
Data
*
pMgmt
);
void
dmUpdateDnodeEps
(
SDnode
Data
*
pMgmt
,
SArray
*
pDnodeEps
);
// dmHandle.c
void
dmInitMsgHandle
(
SMgmtWrapper
*
pWrapper
);
void
dmSendStatusReq
(
SDnode
Mgmt
*
pMgmt
);
int32_t
dmProcessConfigReq
(
SDnode
Mgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessStatusRsp
(
SDnode
Mgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessAuthRsp
(
SDnode
Mgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessGrantRsp
(
SDnode
Mgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
void
dmSendStatusReq
(
SDnode
Data
*
pMgmt
);
int32_t
dmProcessConfigReq
(
SDnode
Data
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessStatusRsp
(
SDnode
Data
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessAuthRsp
(
SDnode
Data
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessGrantRsp
(
SDnode
Data
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessCDnodeReq
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
);
// dmMonitor.c
...
...
@@ -58,9 +43,9 @@ void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void
dmSendMonitorReport
(
SDnode
*
pDnode
);
// dmWorker.c
int32_t
dmStartThread
(
SDnode
Mgmt
*
pMgmt
);
int32_t
dmStartWorker
(
SDnode
Mgmt
*
pMgmt
);
void
dmStopWorker
(
SDnode
Mgmt
*
pMgmt
);
int32_t
dmStartThread
(
SDnode
Data
*
pMgmt
);
int32_t
dmStartWorker
(
SDnode
Data
*
pMgmt
);
void
dmStopWorker
(
SDnode
Data
*
pMgmt
);
int32_t
dmProcessMgmtMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
int32_t
dmProcessMonitorMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
...
...
source/dnode/mgmt/interface/CMakeLists.txt
0 → 100644
浏览文件 @
4fedc23b
aux_source_directory
(
src DNODE_INTERFACE
)
add_library
(
dnode_interface STATIC
${
DNODE_INTERFACE
}
)
target_include_directories
(
dnode_interface
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/dnode/mgmt"
PUBLIC
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_link_libraries
(
dnode_interface cjson mnode vnode qnode snode bnode wal sync taos tfs monitor util
)
\ No newline at end of file
source/dnode/mgmt/interface/inc/dndDef.h
0 → 100644
浏览文件 @
4fedc23b
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DEF_H_
#define _TD_DND_DEF_H_
#include "dndLog.h"
#include "cJSON.h"
#include "tcache.h"
#include "tcrc32c.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tprocess.h"
#include "tqueue.h"
#include "trpc.h"
#include "tthread.h"
#include "ttime.h"
#include "tworker.h"
#include "dnode.h"
#include "monitor.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
enum
{
NODE_BEGIN
,
VNODE
,
QNODE
,
SNODE
,
MNODE
,
BNODE
,
NODE_END
}
EDndNodeType
;
typedef
enum
{
DND_STAT_INIT
,
DND_STAT_RUNNING
,
DND_STAT_STOPPED
}
EDndRunStatus
;
typedef
enum
{
DND_ENV_INIT
,
DND_ENV_READY
,
DND_ENV_CLEANUP
}
EDndEnvStatus
;
typedef
enum
{
DND_PROC_SINGLE
,
DND_PROC_CHILD
,
DND_PROC_PARENT
}
EDndProcType
;
typedef
int32_t
(
*
NodeMsgFp
)(
struct
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
typedef
int32_t
(
*
OpenNodeFp
)(
struct
SMgmtWrapper
*
pWrapper
);
typedef
void
(
*
CloseNodeFp
)(
struct
SMgmtWrapper
*
pWrapper
);
typedef
int32_t
(
*
StartNodeFp
)(
struct
SMgmtWrapper
*
pWrapper
);
typedef
void
(
*
StopNodeFp
)(
struct
SMgmtWrapper
*
pWrapper
);
typedef
int32_t
(
*
CreateNodeFp
)(
struct
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
typedef
int32_t
(
*
DropNodeFp
)(
struct
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
typedef
int32_t
(
*
RequireNodeFp
)(
struct
SMgmtWrapper
*
pWrapper
,
bool
*
required
);
typedef
struct
{
SMgmtWrapper
*
pQndWrapper
;
SMgmtWrapper
*
pMndWrapper
;
SMgmtWrapper
*
pNdWrapper
;
}
SMsgHandle
;
typedef
struct
{
OpenNodeFp
openFp
;
CloseNodeFp
closeFp
;
StartNodeFp
startFp
;
StopNodeFp
stopFp
;
CreateNodeFp
createFp
;
DropNodeFp
dropFp
;
RequireNodeFp
requiredFp
;
}
SMgmtFp
;
typedef
struct
SMgmtWrapper
{
SDnode
*
pDnode
;
struct
{
const
char
*
name
;
char
*
path
;
int32_t
refCount
;
SRWLatch
latch
;
EDndNodeType
ntype
;
bool
deployed
;
bool
required
;
SMgmtFp
fp
;
void
*
pMgmt
;
};
struct
{
EDndProcType
procType
;
int32_t
procId
;
SProcObj
*
procObj
;
SShm
procShm
;
};
struct
{
int8_t
msgVgIds
[
TDMT_MAX
];
// Handle the case where the same message type is distributed to qnode or vnode
NodeMsgFp
msgFps
[
TDMT_MAX
];
};
}
SMgmtWrapper
;
typedef
struct
{
void
*
serverRpc
;
void
*
clientRpc
;
SMsgHandle
msgHandles
[
TDMT_MAX
];
}
SDnodeTrans
;
typedef
struct
{
int32_t
dnodeId
;
int64_t
clusterId
;
int64_t
dnodeVer
;
int64_t
updateTime
;
int64_t
rebootTime
;
bool
dropped
;
int8_t
statusSent
;
SEpSet
mnodeEpSet
;
SHashObj
*
dnodeHash
;
SArray
*
dnodeEps
;
TdThread
*
threadId
;
SRWLatch
latch
;
SSingleWorker
mgmtWorker
;
SSingleWorker
monitorWorker
;
SMsgCb
msgCb
;
SDnode
*
pDnode
;
const
char
*
path
;
TdFilePtr
lockfile
;
struct
{
char
*
localEp
;
char
*
localFqdn
;
char
*
firstEp
;
char
*
secondEp
;
char
*
dataDir
;
SDiskCfg
*
disks
;
int32_t
numOfDisks
;
int32_t
supportVnodes
;
uint16_t
serverPort
;
};
}
SDnodeData
;
typedef
struct
SDnode
{
EDndProcType
ptype
;
EDndNodeType
ntype
;
EDndRunStatus
status
;
EDndEvent
event
;
SStartupReq
startup
;
SDnodeTrans
trans
;
SDnodeData
data
;
SMgmtWrapper
wrappers
[
NODE_END
];
}
SDnode
;
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DND_DEF_H_*/
\ No newline at end of file
source/dnode/mgmt/inc/dndInt.h
→
source/dnode/mgmt/in
terface/in
c/dndInt.h
浏览文件 @
4fedc23b
...
...
@@ -16,130 +16,17 @@
#ifndef _TD_DND_INT_H_
#define _TD_DND_INT_H_
#include "os.h"
#include "cJSON.h"
#include "tcache.h"
#include "tcrc32c.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tprocess.h"
#include "tqueue.h"
#include "trpc.h"
#include "tthread.h"
#include "ttime.h"
#include "tworker.h"
#include "dnode.h"
#include "monitor.h"
#include "dndLog.h"
#include "dndDef.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
typedef
enum
{
DNODE
,
VNODES
,
QNODE
,
SNODE
,
MNODE
,
BNODE
,
NODE_MAX
}
EDndType
;
typedef
enum
{
DND_STAT_INIT
,
DND_STAT_RUNNING
,
DND_STAT_STOPPED
}
EDndStatus
;
typedef
enum
{
DND_ENV_INIT
,
DND_ENV_READY
,
DND_ENV_CLEANUP
}
EEnvStatus
;
typedef
enum
{
PROC_SINGLE
,
PROC_CHILD
,
PROC_PARENT
}
EProcType
;
typedef
struct
SMgmtFp
SMgmtFp
;
typedef
struct
SMgmtWrapper
SMgmtWrapper
;
typedef
struct
SMsgHandle
SMsgHandle
;
typedef
struct
SDnodeMgmt
SDnodeMgmt
;
typedef
struct
SVnodesMgmt
SVnodesMgmt
;
typedef
struct
SMnodeMgmt
SMnodeMgmt
;
typedef
struct
SQnodeMgmt
SQnodeMgmt
;
typedef
struct
SSnodeMgmt
SSnodeMgmt
;
typedef
struct
SBnodeMgmt
SBnodeMgmt
;
typedef
int32_t
(
*
NodeMsgFp
)(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
typedef
int32_t
(
*
OpenNodeFp
)(
SMgmtWrapper
*
pWrapper
);
typedef
void
(
*
CloseNodeFp
)(
SMgmtWrapper
*
pWrapper
);
typedef
int32_t
(
*
StartNodeFp
)(
SMgmtWrapper
*
pWrapper
);
typedef
int32_t
(
*
CreateNodeFp
)(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
typedef
int32_t
(
*
DropNodeFp
)(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
typedef
int32_t
(
*
RequireNodeFp
)(
SMgmtWrapper
*
pWrapper
,
bool
*
required
);
typedef
struct
SMsgHandle
{
SMgmtWrapper
*
pQndWrapper
;
SMgmtWrapper
*
pMndWrapper
;
SMgmtWrapper
*
pWrapper
;
}
SMsgHandle
;
typedef
struct
SMgmtFp
{
OpenNodeFp
openFp
;
CloseNodeFp
closeFp
;
StartNodeFp
startFp
;
CreateNodeFp
createMsgFp
;
DropNodeFp
dropMsgFp
;
RequireNodeFp
requiredFp
;
}
SMgmtFp
;
typedef
struct
SMgmtWrapper
{
const
char
*
name
;
char
*
path
;
int32_t
refCount
;
SRWLatch
latch
;
EDndType
ntype
;
bool
deployed
;
bool
required
;
EProcType
procType
;
int32_t
procId
;
SProcObj
*
pProc
;
SShm
shm
;
void
*
pMgmt
;
SDnode
*
pDnode
;
SMgmtFp
fp
;
int8_t
msgVgIds
[
TDMT_MAX
];
// Handle the case where the same message type is distributed to qnode or vnode
NodeMsgFp
msgFps
[
TDMT_MAX
];
}
SMgmtWrapper
;
typedef
struct
{
void
*
serverRpc
;
void
*
clientRpc
;
SMsgHandle
msgHandles
[
TDMT_MAX
];
}
STransMgmt
;
typedef
struct
SDnode
{
int64_t
clusterId
;
int32_t
dnodeId
;
int32_t
numOfSupportVnodes
;
int64_t
rebootTime
;
char
*
localEp
;
char
*
localFqdn
;
char
*
firstEp
;
char
*
secondEp
;
char
*
dataDir
;
SDiskCfg
*
disks
;
int32_t
numOfDisks
;
uint16_t
serverPort
;
bool
dropped
;
EProcType
procType
;
EDndType
ntype
;
EDndStatus
status
;
EDndEvent
event
;
SStartupReq
startup
;
TdFilePtr
lockfile
;
STransMgmt
trans
;
SMgmtWrapper
wrappers
[
NODE_MAX
];
}
SDnode
;
// dndEnv.c
const
char
*
dndStatStr
(
EDndStatus
stat
);
const
char
*
dndNodeLogStr
(
EDndType
ntype
);
const
char
*
dndNodeProcStr
(
EDndType
ntype
);
const
char
*
dndStatStr
(
EDnd
Run
Status
stat
);
const
char
*
dndNodeLogStr
(
EDnd
Node
Type
ntype
);
const
char
*
dndNodeProcStr
(
EDnd
Node
Type
ntype
);
const
char
*
dndEventStr
(
EDndEvent
ev
);
// dndExec.c
...
...
@@ -154,10 +41,10 @@ int32_t dndReadShmFile(SDnode *pDnode);
int32_t
dndWriteShmFile
(
SDnode
*
pDnode
);
// dndInt.c
EDndStatus
dndGetStatus
(
SDnode
*
pDnode
);
void
dndSetStatus
(
SDnode
*
pDnode
,
EDndStatus
stat
);
EDnd
Run
Status
dndGetStatus
(
SDnode
*
pDnode
);
void
dndSetStatus
(
SDnode
*
pDnode
,
EDnd
Run
Status
stat
);
void
dndSetMsgHandle
(
SMgmtWrapper
*
pWrapper
,
tmsg_t
msgType
,
NodeMsgFp
nodeMsgFp
,
int8_t
vgId
);
SMgmtWrapper
*
dndAcquireWrapper
(
SDnode
*
pDnode
,
EDndType
nType
);
SMgmtWrapper
*
dndAcquireWrapper
(
SDnode
*
pDnode
,
EDnd
Node
Type
nType
);
int32_t
dndMarkWrapper
(
SMgmtWrapper
*
pWrapper
);
void
dndReleaseWrapper
(
SMgmtWrapper
*
pWrapper
);
void
dndHandleEvent
(
SDnode
*
pDnode
,
EDndEvent
event
);
...
...
@@ -180,9 +67,9 @@ void smSetMgmtFp(SMgmtWrapper *pWrapper);
void
vmSetMgmtFp
(
SMgmtWrapper
*
pWrapper
);
void
mmSetMgmtFp
(
SMgmtWrapper
*
pMgmt
);
void
dmGetMnodeEpSet
(
SDnode
Mgmt
*
pMgmt
,
SEpSet
*
pEpSet
);
void
dmUpdateMnodeEpSet
(
SDnode
Mgmt
*
pMgmt
,
SEpSet
*
pEpSet
);
void
dmSendRedirectRsp
(
SDnode
Mgmt
*
pMgmt
,
const
SRpcMsg
*
pMsg
);
void
dmGetMnodeEpSet
(
SDnode
Data
*
pMgmt
,
SEpSet
*
pEpSet
);
void
dmUpdateMnodeEpSet
(
SDnode
Data
*
pMgmt
,
SEpSet
*
pEpSet
);
void
dmSendRedirectRsp
(
SDnode
Data
*
pMgmt
,
const
SRpcMsg
*
pMsg
);
void
dmGetMonitorSysInfo
(
SMonSysInfo
*
pInfo
);
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
);
...
...
source/dnode/mgmt/interface/inc/dndLog.h
0 → 100644
浏览文件 @
4fedc23b
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_LOG_H_
#define _TD_DND_LOG_H_
#include "tlog.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DND_LOG_H_*/
\ No newline at end of file
source/dnode/mgmt/interface/src/dndInt.c
0 → 100644
浏览文件 @
4fedc23b
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndInt.h"
EDndRunStatus
dndGetStatus
(
SDnode
*
pDnode
)
{
return
pDnode
->
status
;
}
void
dndSetStatus
(
SDnode
*
pDnode
,
EDndRunStatus
status
)
{
if
(
pDnode
->
status
!=
status
)
{
dDebug
(
"dnode status set from %s to %s"
,
dndStatStr
(
pDnode
->
status
),
dndStatStr
(
status
));
pDnode
->
status
=
status
;
}
}
const
char
*
dndStatStr
(
EDndRunStatus
status
)
{
switch
(
status
)
{
case
DND_STAT_INIT
:
return
"init"
;
case
DND_STAT_RUNNING
:
return
"running"
;
case
DND_STAT_STOPPED
:
return
"stopped"
;
default:
return
"UNKNOWN"
;
}
}
const
char
*
dndNodeLogStr
(
EDndNodeType
ntype
)
{
switch
(
ntype
)
{
case
VNODE
:
return
"vnode"
;
case
QNODE
:
return
"qnode"
;
case
SNODE
:
return
"snode"
;
case
MNODE
:
return
"mnode"
;
case
BNODE
:
return
"bnode"
;
default:
return
"taosd"
;
}
}
const
char
*
dndNodeProcStr
(
EDndNodeType
ntype
)
{
switch
(
ntype
)
{
case
VNODE
:
return
"taosv"
;
case
QNODE
:
return
"taosq"
;
case
SNODE
:
return
"taoss"
;
case
MNODE
:
return
"taosm"
;
case
BNODE
:
return
"taosb"
;
default:
return
"taosd"
;
}
}
const
char
*
dndEventStr
(
EDndEvent
ev
)
{
switch
(
ev
)
{
case
DND_EVENT_START
:
return
"start"
;
case
DND_EVENT_STOP
:
return
"stop"
;
case
DND_EVENT_CHILD
:
return
"child"
;
default:
return
"UNKNOWN"
;
}
}
\ No newline at end of file
source/dnode/mgmt/main/dndEnv.c
浏览文件 @
4fedc23b
...
...
@@ -57,63 +57,3 @@ void dndCleanup() {
taosStopCacheRefreshWorker
();
dInfo
(
"dnode env is cleaned up"
);
}
const
char
*
dndStatStr
(
EDndStatus
status
)
{
switch
(
status
)
{
case
DND_STAT_INIT
:
return
"init"
;
case
DND_STAT_RUNNING
:
return
"running"
;
case
DND_STAT_STOPPED
:
return
"stopped"
;
default:
return
"UNKNOWN"
;
}
}
const
char
*
dndNodeLogStr
(
EDndType
ntype
)
{
switch
(
ntype
)
{
case
VNODES
:
return
"vnode"
;
case
QNODE
:
return
"qnode"
;
case
SNODE
:
return
"snode"
;
case
MNODE
:
return
"mnode"
;
case
BNODE
:
return
"bnode"
;
default:
return
"taosd"
;
}
}
const
char
*
dndNodeProcStr
(
EDndType
ntype
)
{
switch
(
ntype
)
{
case
VNODES
:
return
"taosv"
;
case
QNODE
:
return
"taosq"
;
case
SNODE
:
return
"taoss"
;
case
MNODE
:
return
"taosm"
;
case
BNODE
:
return
"taosb"
;
default:
return
"taosd"
;
}
}
const
char
*
dndEventStr
(
EDndEvent
ev
)
{
switch
(
ev
)
{
case
DND_EVENT_START
:
return
"start"
;
case
DND_EVENT_STOP
:
return
"stop"
;
case
DND_EVENT_CHILD
:
return
"child"
;
default:
return
"UNKNOWN"
;
}
}
\ No newline at end of file
source/dnode/mgmt/main/dndExec.c
浏览文件 @
4fedc23b
...
...
@@ -29,7 +29,7 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) {
static
int32_t
dndInitNodeProc
(
SMgmtWrapper
*
pWrapper
)
{
int32_t
shmsize
=
tsMnodeShmSize
;
if
(
pWrapper
->
ntype
==
VNODE
S
)
{
if
(
pWrapper
->
ntype
==
VNODE
)
{
shmsize
=
tsVnodeShmSize
;
}
else
if
(
pWrapper
->
ntype
==
QNODE
)
{
shmsize
=
tsQnodeShmSize
;
...
...
@@ -43,18 +43,18 @@ static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) {
return
-
1
;
}
if
(
taosCreateShm
(
&
pWrapper
->
s
hm
,
pWrapper
->
ntype
,
shmsize
)
!=
0
)
{
if
(
taosCreateShm
(
&
pWrapper
->
procS
hm
,
pWrapper
->
ntype
,
shmsize
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
terrno
);
dError
(
"node:%s, failed to create shm size:%d since %s"
,
pWrapper
->
name
,
shmsize
,
terrstr
());
return
-
1
;
}
dInfo
(
"node:%s, shm:%d is created, size:%d"
,
pWrapper
->
name
,
pWrapper
->
s
hm
.
id
,
shmsize
);
dInfo
(
"node:%s, shm:%d is created, size:%d"
,
pWrapper
->
name
,
pWrapper
->
procS
hm
.
id
,
shmsize
);
SProcCfg
cfg
=
dndGenProcCfg
(
pWrapper
);
cfg
.
isChild
=
false
;
pWrapper
->
procType
=
PROC_PARENT
;
pWrapper
->
p
Proc
=
taosProcInit
(
&
cfg
);
if
(
pWrapper
->
p
Proc
==
NULL
)
{
pWrapper
->
procType
=
DND_
PROC_PARENT
;
pWrapper
->
p
rocObj
=
taosProcInit
(
&
cfg
);
if
(
pWrapper
->
p
rocObj
==
NULL
)
{
dError
(
"node:%s, failed to create proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -62,7 +62,7 @@ static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) {
return
0
;
}
static
int32_t
dndNewNodeProc
(
SMgmtWrapper
*
pWrapper
,
EDndType
n
)
{
static
int32_t
dndNewNodeProc
(
SMgmtWrapper
*
pWrapper
,
EDnd
Node
Type
n
)
{
char
tstr
[
8
]
=
{
0
};
char
*
args
[
6
]
=
{
0
};
snprintf
(
tstr
,
sizeof
(
tstr
),
"%d"
,
n
);
...
...
@@ -85,7 +85,7 @@ static int32_t dndNewNodeProc(SMgmtWrapper *pWrapper, EDndType n) {
}
static
int32_t
dndRunNodeProc
(
SMgmtWrapper
*
pWrapper
)
{
if
(
pWrapper
->
pDnode
->
ntype
==
NODE_
MAX
)
{
if
(
pWrapper
->
pDnode
->
ntype
==
NODE_
END
)
{
dInfo
(
"node:%s, should be started manually"
,
pWrapper
->
name
);
}
else
{
if
(
dndNewNodeProc
(
pWrapper
,
pWrapper
->
ntype
)
!=
0
)
{
...
...
@@ -93,7 +93,7 @@ static int32_t dndRunNodeProc(SMgmtWrapper *pWrapper) {
}
}
if
(
taosProcRun
(
pWrapper
->
p
Proc
)
!=
0
)
{
if
(
taosProcRun
(
pWrapper
->
p
rocObj
)
!=
0
)
{
dError
(
"node:%s, failed to run proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -120,9 +120,9 @@ static int32_t dndOpenNodeImp(SMgmtWrapper *pWrapper) {
int32_t
dndOpenNode
(
SMgmtWrapper
*
pWrapper
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
if
(
pDnode
->
p
rocType
==
PROC_SINGLE
)
{
if
(
pDnode
->
p
type
==
DND_
PROC_SINGLE
)
{
return
dndOpenNodeImp
(
pWrapper
);
}
else
if
(
pDnode
->
p
rocType
==
PROC_PARENT
)
{
}
else
if
(
pDnode
->
p
type
==
DND_
PROC_PARENT
)
{
if
(
dndInitNodeProc
(
pWrapper
)
!=
0
)
return
-
1
;
if
(
dndWriteShmFile
(
pDnode
)
!=
0
)
return
-
1
;
if
(
dndRunNodeProc
(
pWrapper
)
!=
0
)
return
-
1
;
...
...
@@ -144,15 +144,15 @@ static void dndCloseNodeImp(SMgmtWrapper *pWrapper) {
taosMsleep
(
10
);
}
if
(
pWrapper
->
p
Proc
)
{
taosProcCleanup
(
pWrapper
->
p
Proc
);
pWrapper
->
p
Proc
=
NULL
;
if
(
pWrapper
->
p
rocObj
)
{
taosProcCleanup
(
pWrapper
->
p
rocObj
);
pWrapper
->
p
rocObj
=
NULL
;
}
dDebug
(
"node:%s, mgmt has been closed"
,
pWrapper
->
name
);
}
void
dndCloseNode
(
SMgmtWrapper
*
pWrapper
)
{
if
(
pWrapper
->
pDnode
->
p
rocType
==
PROC_PARENT
)
{
if
(
pWrapper
->
pDnode
->
p
type
==
DND_
PROC_PARENT
)
{
if
(
pWrapper
->
procId
>
0
&&
taosProcExist
(
pWrapper
->
procId
))
{
dInfo
(
"node:%s, send kill signal to the child process:%d"
,
pWrapper
->
name
,
pWrapper
->
procId
);
taosKillProc
(
pWrapper
->
procId
);
...
...
@@ -172,9 +172,9 @@ static void dndProcessProcHandle(void *handle) {
static
int32_t
dndRunInSingleProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode run in single process"
);
pDnode
->
p
rocType
=
PROC_SINGLE
;
pDnode
->
p
type
=
DND_
PROC_SINGLE
;
for
(
EDnd
Type
n
=
DNODE
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
NODE_BEGIN
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
pWrapper
->
required
=
dndRequireNode
(
pWrapper
);
if
(
!
pWrapper
->
required
)
continue
;
...
...
@@ -187,7 +187,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dndSetStatus
(
pDnode
,
DND_STAT_RUNNING
);
for
(
EDnd
Type
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
0
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
pWrapper
->
fp
.
startFp
==
NULL
)
continue
;
...
...
@@ -213,15 +213,15 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
static
int32_t
dndRunInParentProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode run in parent process"
);
pDnode
->
p
rocType
=
PROC_PARENT
;
pDnode
->
p
type
=
DND_
PROC_PARENT
;
SMgmtWrapper
*
pDWrapper
=
&
pDnode
->
wrappers
[
DNODE
];
SMgmtWrapper
*
pDWrapper
=
&
pDnode
->
wrappers
[
NODE_BEGIN
];
if
(
dndOpenNodeImp
(
pDWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pDWrapper
->
name
,
terrstr
());
return
-
1
;
}
for
(
EDnd
Type
n
=
DNODE
+
1
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
NODE_BEGIN
+
1
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
pWrapper
->
required
=
dndRequireNode
(
pWrapper
);
if
(
!
pWrapper
->
required
)
continue
;
...
...
@@ -233,7 +233,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
return
-
1
;
}
for
(
EDnd
Type
n
=
DNODE
+
1
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
NODE_BEGIN
+
1
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
dndRunNodeProc
(
pWrapper
)
!=
0
)
return
-
1
;
...
...
@@ -254,10 +254,10 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
dInfo
(
"dnode is about to stop"
);
dndSetStatus
(
pDnode
,
DND_STAT_STOPPED
);
for
(
EDnd
Type
n
=
DNODE
+
1
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
NODE_BEGIN
+
1
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
pDnode
->
ntype
==
NODE_
MAX
)
continue
;
if
(
pDnode
->
ntype
==
NODE_
END
)
continue
;
if
(
pWrapper
->
procId
>
0
&&
taosProcExist
(
pWrapper
->
procId
))
{
dInfo
(
"node:%s, send kill signal to the child process:%d"
,
pWrapper
->
name
,
pWrapper
->
procId
);
...
...
@@ -269,14 +269,14 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
}
break
;
}
else
{
for
(
EDnd
Type
n
=
DNODE
+
1
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
NODE_BEGIN
+
1
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
pDnode
->
ntype
==
NODE_
MAX
)
continue
;
if
(
pDnode
->
ntype
==
NODE_
END
)
continue
;
if
(
pWrapper
->
procId
<=
0
||
!
taosProcExist
(
pWrapper
->
procId
))
{
dWarn
(
"node:%s, process:%d is killed and needs to be restarted"
,
pWrapper
->
name
,
pWrapper
->
procId
);
taosProcCloseHandles
(
pWrapper
->
p
Proc
,
dndProcessProcHandle
);
taosProcCloseHandles
(
pWrapper
->
p
rocObj
,
dndProcessProcHandle
);
dndNewNodeProc
(
pWrapper
,
n
);
}
}
...
...
@@ -291,7 +291,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
static
int32_t
dndRunInChildProcess
(
SDnode
*
pDnode
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
pDnode
->
ntype
];
dInfo
(
"%s run in child process"
,
pWrapper
->
name
);
pDnode
->
p
rocType
=
PROC_CHILD
;
pDnode
->
p
type
=
DND_
PROC_CHILD
;
pWrapper
->
required
=
dndRequireNode
(
pWrapper
);
if
(
!
pWrapper
->
required
)
{
...
...
@@ -301,7 +301,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
SMsgCb
msgCb
=
dndCreateMsgcb
(
pWrapper
);
tmsgSetDefaultMsgCb
(
&
msgCb
);
pWrapper
->
procType
=
PROC_CHILD
;
pWrapper
->
procType
=
DND_
PROC_CHILD
;
if
(
dndOpenNodeImp
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
...
...
@@ -310,8 +310,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
SProcCfg
cfg
=
dndGenProcCfg
(
pWrapper
);
cfg
.
isChild
=
true
;
pWrapper
->
p
Proc
=
taosProcInit
(
&
cfg
);
if
(
pWrapper
->
p
Proc
==
NULL
)
{
pWrapper
->
p
rocObj
=
taosProcInit
(
&
cfg
);
if
(
pWrapper
->
p
rocObj
==
NULL
)
{
dError
(
"node:%s, failed to create proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -325,7 +325,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
dndSetStatus
(
pDnode
,
DND_STAT_RUNNING
);
if
(
taosProcRun
(
pWrapper
->
p
Proc
)
!=
0
)
{
if
(
taosProcRun
(
pWrapper
->
p
rocObj
)
!=
0
)
{
dError
(
"node:%s, failed to run proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -347,7 +347,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
int32_t
dndRun
(
SDnode
*
pDnode
)
{
if
(
!
tsMultiProcess
)
{
return
dndRunInSingleProcess
(
pDnode
);
}
else
if
(
pDnode
->
ntype
==
DNODE
||
pDnode
->
ntype
==
NODE_MAX
)
{
}
else
if
(
pDnode
->
ntype
==
NODE_BEGIN
||
pDnode
->
ntype
==
NODE_END
)
{
return
dndRunInParentProcess
(
pDnode
);
}
else
{
return
dndRunInChildProcess
(
pDnode
);
...
...
source/dnode/mgmt/main/dndFile.c
浏览文件 @
4fedc23b
...
...
@@ -148,7 +148,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
cJSON
*
root
=
NULL
;
TdFilePtr
pFile
=
NULL
;
snprintf
(
file
,
sizeof
(
file
),
"%s%s.shmfile"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%s.shmfile"
,
pDnode
->
data
.
data
Dir
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
...
...
@@ -164,37 +164,37 @@ int32_t dndReadShmFile(SDnode *pDnode) {
goto
_OVER
;
}
for
(
EDnd
Type
ntype
=
DNODE
+
1
;
ntype
<
NODE_MAX
;
++
ntype
)
{
for
(
EDnd
NodeType
ntype
=
NODE_BEGIN
+
1
;
ntype
<
NODE_END
;
++
ntype
)
{
snprintf
(
itemName
,
sizeof
(
itemName
),
"%s_shmid"
,
dndNodeProcStr
(
ntype
));
cJSON
*
shmid
=
cJSON_GetObjectItem
(
root
,
itemName
);
if
(
shmid
&&
shmid
->
type
==
cJSON_Number
)
{
pDnode
->
wrappers
[
ntype
].
s
hm
.
id
=
shmid
->
valueint
;
pDnode
->
wrappers
[
ntype
].
procS
hm
.
id
=
shmid
->
valueint
;
}
snprintf
(
itemName
,
sizeof
(
itemName
),
"%s_shmsize"
,
dndNodeProcStr
(
ntype
));
cJSON
*
shmsize
=
cJSON_GetObjectItem
(
root
,
itemName
);
if
(
shmsize
&&
shmsize
->
type
==
cJSON_Number
)
{
pDnode
->
wrappers
[
ntype
].
s
hm
.
size
=
shmsize
->
valueint
;
pDnode
->
wrappers
[
ntype
].
procS
hm
.
size
=
shmsize
->
valueint
;
}
}
}
if
(
!
tsMultiProcess
||
pDnode
->
ntype
==
DNODE
||
pDnode
->
ntype
==
NODE_MAX
)
{
for
(
EDnd
Type
ntype
=
DNODE
;
ntype
<
NODE_MAX
;
++
ntype
)
{
if
(
!
tsMultiProcess
||
pDnode
->
ntype
==
NODE_BEGIN
||
pDnode
->
ntype
==
NODE_END
)
{
for
(
EDnd
NodeType
ntype
=
NODE_BEGIN
;
ntype
<
NODE_END
;
++
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
if
(
pWrapper
->
s
hm
.
id
>=
0
)
{
dDebug
(
"shmid:%d, is closed, size:%d"
,
pWrapper
->
shm
.
id
,
pWrapper
->
s
hm
.
size
);
taosDropShm
(
&
pWrapper
->
s
hm
);
if
(
pWrapper
->
procS
hm
.
id
>=
0
)
{
dDebug
(
"shmid:%d, is closed, size:%d"
,
pWrapper
->
procShm
.
id
,
pWrapper
->
procS
hm
.
size
);
taosDropShm
(
&
pWrapper
->
procS
hm
);
}
}
}
else
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
pDnode
->
ntype
];
if
(
taosAttachShm
(
&
pWrapper
->
s
hm
)
!=
0
)
{
if
(
taosAttachShm
(
&
pWrapper
->
procS
hm
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"shmid:%d, failed to attach shm since %s"
,
pWrapper
->
s
hm
.
id
,
terrstr
());
dError
(
"shmid:%d, failed to attach shm since %s"
,
pWrapper
->
procS
hm
.
id
,
terrstr
());
goto
_OVER
;
}
dInfo
(
"node:%s, shmid:%d is attached, size:%d"
,
pWrapper
->
name
,
pWrapper
->
shm
.
id
,
pWrapper
->
s
hm
.
size
);
dInfo
(
"node:%s, shmid:%d is attached, size:%d"
,
pWrapper
->
name
,
pWrapper
->
procShm
.
id
,
pWrapper
->
procS
hm
.
size
);
}
dDebug
(
"successed to load %s"
,
file
);
...
...
@@ -215,8 +215,8 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
char
realfile
[
PATH_MAX
]
=
{
0
};
TdFilePtr
pFile
=
NULL
;
snprintf
(
file
,
sizeof
(
file
),
"%s%s.shmfile.bak"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%s.shmfile"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%s.shmfile.bak"
,
pDnode
->
data
.
data
Dir
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%s.shmfile"
,
pDnode
->
data
.
data
Dir
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
...
...
@@ -226,13 +226,13 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
}
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"{
\n
"
);
for
(
EDnd
Type
ntype
=
DNODE
+
1
;
ntype
<
NODE_MAX
;
++
ntype
)
{
for
(
EDnd
NodeType
ntype
=
NODE_BEGIN
+
1
;
ntype
<
NODE_END
;
++
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmid
\"
:%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
s
hm
.
id
);
if
(
ntype
==
NODE_
MAX
-
1
)
{
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:%d
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
s
hm
.
size
);
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmid
\"
:%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
procS
hm
.
id
);
if
(
ntype
==
NODE_
END
-
1
)
{
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:%d
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
procS
hm
.
size
);
}
else
{
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
s
hm
.
size
);
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
procS
hm
.
size
);
}
}
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"}
\n
"
);
...
...
source/dnode/mgmt/main/dnd
Int
.c
→
source/dnode/mgmt/main/dnd
Obj
.c
浏览文件 @
4fedc23b
...
...
@@ -17,27 +17,27 @@
#include "dndInt.h"
static
int32_t
dndInitVars
(
SDnode
*
pDnode
,
const
SDnodeOpt
*
pOption
)
{
pDnode
->
numOfS
upportVnodes
=
pOption
->
numOfSupportVnodes
;
pDnode
->
serverPort
=
pOption
->
serverPort
;
pDnode
->
dataDir
=
strdup
(
pOption
->
dataDir
);
pDnode
->
localEp
=
strdup
(
pOption
->
localEp
);
pDnode
->
localFqdn
=
strdup
(
pOption
->
localFqdn
);
pDnode
->
firstEp
=
strdup
(
pOption
->
firstEp
);
pDnode
->
secondEp
=
strdup
(
pOption
->
secondEp
);
pDnode
->
disks
=
pOption
->
disks
;
pDnode
->
numOfDisks
=
pOption
->
numOfDisks
;
pDnode
->
data
.
s
upportVnodes
=
pOption
->
numOfSupportVnodes
;
pDnode
->
data
.
serverPort
=
pOption
->
serverPort
;
pDnode
->
data
.
data
Dir
=
strdup
(
pOption
->
dataDir
);
pDnode
->
data
.
localEp
=
strdup
(
pOption
->
localEp
);
pDnode
->
data
.
localFqdn
=
strdup
(
pOption
->
localFqdn
);
pDnode
->
data
.
firstEp
=
strdup
(
pOption
->
firstEp
);
pDnode
->
data
.
secondEp
=
strdup
(
pOption
->
secondEp
);
pDnode
->
d
ata
.
d
isks
=
pOption
->
disks
;
pDnode
->
data
.
numOfDisks
=
pOption
->
numOfDisks
;
pDnode
->
ntype
=
pOption
->
ntype
;
pDnode
->
rebootTime
=
taosGetTimestampMs
();
pDnode
->
data
.
rebootTime
=
taosGetTimestampMs
();
if
(
pDnode
->
data
Dir
==
NULL
||
pDnode
->
localEp
==
NULL
||
pDnode
->
localFqdn
==
NULL
||
pDnode
->
firstEp
==
NULL
||
pDnode
->
secondEp
==
NULL
)
{
if
(
pDnode
->
data
.
dataDir
==
NULL
||
pDnode
->
data
.
localEp
==
NULL
||
pDnode
->
data
.
localFqdn
==
NULL
||
pDnode
->
data
.
firstEp
==
NULL
||
pDnode
->
data
.
secondEp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
!
tsMultiProcess
||
pDnode
->
ntype
==
DNODE
||
pDnode
->
ntype
==
NODE_MAX
)
{
pDnode
->
lockfile
=
dndCheckRunning
(
pDnode
->
dataDir
);
if
(
pDnode
->
lockfile
==
NULL
)
{
if
(
!
tsMultiProcess
||
pDnode
->
ntype
==
NODE_BEGIN
||
pDnode
->
ntype
==
NODE_END
)
{
pDnode
->
data
.
lockfile
=
dndCheckRunning
(
pDnode
->
data
.
dataDir
);
if
(
pDnode
->
data
.
lockfile
==
NULL
)
{
return
-
1
;
}
}
...
...
@@ -46,20 +46,20 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
}
static
void
dndClearVars
(
SDnode
*
pDnode
)
{
for
(
EDnd
Type
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
0
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pMgmt
=
&
pDnode
->
wrappers
[
n
];
taosMemoryFreeClear
(
pMgmt
->
path
);
}
if
(
pDnode
->
lockfile
!=
NULL
)
{
taosUnLockFile
(
pDnode
->
lockfile
);
taosCloseFile
(
&
pDnode
->
lockfile
);
pDnode
->
lockfile
=
NULL
;
if
(
pDnode
->
data
.
lockfile
!=
NULL
)
{
taosUnLockFile
(
pDnode
->
data
.
lockfile
);
taosCloseFile
(
&
pDnode
->
data
.
lockfile
);
pDnode
->
data
.
lockfile
=
NULL
;
}
taosMemoryFreeClear
(
pDnode
->
localEp
);
taosMemoryFreeClear
(
pDnode
->
localFqdn
);
taosMemoryFreeClear
(
pDnode
->
firstEp
);
taosMemoryFreeClear
(
pDnode
->
secondEp
);
taosMemoryFreeClear
(
pDnode
->
dataDir
);
taosMemoryFreeClear
(
pDnode
->
data
.
localEp
);
taosMemoryFreeClear
(
pDnode
->
data
.
localFqdn
);
taosMemoryFreeClear
(
pDnode
->
data
.
firstEp
);
taosMemoryFreeClear
(
pDnode
->
data
.
secondEp
);
taosMemoryFreeClear
(
pDnode
->
data
.
data
Dir
);
taosMemoryFree
(
pDnode
);
dDebug
(
"dnode memory is cleared, data:%p"
,
pDnode
);
}
...
...
@@ -82,18 +82,18 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
}
dndSetStatus
(
pDnode
,
DND_STAT_INIT
);
dmSetMgmtFp
(
&
pDnode
->
wrappers
[
DNODE
]);
dmSetMgmtFp
(
&
pDnode
->
wrappers
[
NODE_BEGIN
]);
mmSetMgmtFp
(
&
pDnode
->
wrappers
[
MNODE
]);
vmSetMgmtFp
(
&
pDnode
->
wrappers
[
VNODE
S
]);
vmSetMgmtFp
(
&
pDnode
->
wrappers
[
VNODE
]);
qmSetMgmtFp
(
&
pDnode
->
wrappers
[
QNODE
]);
smSetMgmtFp
(
&
pDnode
->
wrappers
[
SNODE
]);
bmSetMgmtFp
(
&
pDnode
->
wrappers
[
BNODE
]);
for
(
EDnd
Type
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
0
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
snprintf
(
path
,
sizeof
(
path
),
"%s%s%s"
,
pDnode
->
dataDir
,
TD_DIRSEP
,
pWrapper
->
name
);
snprintf
(
path
,
sizeof
(
path
),
"%s%s%s"
,
pDnode
->
data
.
data
Dir
,
TD_DIRSEP
,
pWrapper
->
name
);
pWrapper
->
path
=
strdup
(
path
);
pWrapper
->
s
hm
.
id
=
-
1
;
pWrapper
->
procS
hm
.
id
=
-
1
;
pWrapper
->
pDnode
=
pDnode
;
pWrapper
->
ntype
=
n
;
if
(
pWrapper
->
path
==
NULL
)
{
...
...
@@ -101,7 +101,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
goto
_OVER
;
}
pWrapper
->
procType
=
PROC_SINGLE
;
pWrapper
->
procType
=
DND_
PROC_SINGLE
;
taosInitRWLatch
(
&
pWrapper
->
latch
);
}
...
...
@@ -134,7 +134,7 @@ _OVER:
void
dndClose
(
SDnode
*
pDnode
)
{
if
(
pDnode
==
NULL
)
return
;
for
(
EDnd
Type
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
0
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
dndCloseNode
(
pWrapper
);
}
...
...
@@ -149,7 +149,7 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
}
}
SMgmtWrapper
*
dndAcquireWrapper
(
SDnode
*
pDnode
,
EDndType
ntype
)
{
SMgmtWrapper
*
dndAcquireWrapper
(
SDnode
*
pDnode
,
EDnd
Node
Type
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
SMgmtWrapper
*
pRetWrapper
=
pWrapper
;
...
...
@@ -170,7 +170,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t
code
=
0
;
taosRLockLatch
(
&
pWrapper
->
latch
);
if
(
pWrapper
->
deployed
||
(
pWrapper
->
procType
==
PROC_PARENT
&&
pWrapper
->
required
))
{
if
(
pWrapper
->
deployed
||
(
pWrapper
->
procType
==
DND_
PROC_PARENT
&&
pWrapper
->
required
))
{
int32_t
refCount
=
atomic_add_fetch_32
(
&
pWrapper
->
refCount
,
1
);
dTrace
(
"node:%s, is marked, refCount:%d"
,
pWrapper
->
name
,
refCount
);
}
else
{
...
...
@@ -196,15 +196,6 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp
pWrapper
->
msgVgIds
[
TMSG_INDEX
(
msgType
)]
=
vgId
;
}
EDndStatus
dndGetStatus
(
SDnode
*
pDnode
)
{
return
pDnode
->
status
;
}
void
dndSetStatus
(
SDnode
*
pDnode
,
EDndStatus
status
)
{
if
(
pDnode
->
status
!=
status
)
{
dDebug
(
"dnode status set from %s to %s"
,
dndStatStr
(
pDnode
->
status
),
dndStatStr
(
status
));
pDnode
->
status
=
status
;
}
}
void
dndReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
)
{
SStartupReq
*
pStartup
=
&
pDnode
->
startup
;
tstrncpy
(
pStartup
->
name
,
pName
,
TSDB_STEP_NAME_LEN
);
...
...
source/dnode/mgmt/main/dndTransport.c
浏览文件 @
4fedc23b
...
...
@@ -21,7 +21,7 @@
#define INTERNAL_SECRET "_pwd"
static
void
dndUpdateMnodeEpSet
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
DNODE
];
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
NODE_BEGIN
];
dmUpdateMnodeEpSet
(
pWrapper
->
pMgmt
,
pEpSet
);
}
...
...
@@ -53,7 +53,7 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS
int32_t
code
=
-
1
;
SNodeMsg
*
pMsg
=
NULL
;
NodeMsgFp
msgFp
=
NULL
;
uint16_t
msgType
=
pRpc
->
msgType
;
uint16_t
msgType
=
pRpc
->
msgType
;
if
(
pEpSet
&&
pEpSet
->
numOfEps
>
0
&&
msgType
==
TDMT_MND_STATUS_RSP
)
{
dndUpdateMnodeEpSet
(
pWrapper
->
pDnode
,
pEpSet
);
...
...
@@ -64,12 +64,12 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS
if
((
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)))
==
NULL
)
goto
_OVER
;
if
(
dndBuildMsg
(
pMsg
,
pRpc
)
!=
0
)
goto
_OVER
;
if
(
pWrapper
->
procType
==
PROC_SINGLE
)
{
if
(
pWrapper
->
procType
==
DND_
PROC_SINGLE
)
{
dTrace
(
"msg:%p, is created, handle:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pMsg
->
user
);
code
=
(
*
msgFp
)(
pWrapper
,
pMsg
);
}
else
if
(
pWrapper
->
procType
==
PROC_PARENT
)
{
}
else
if
(
pWrapper
->
procType
==
DND_
PROC_PARENT
)
{
dTrace
(
"msg:%p, is created and put into child queue, handle:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pMsg
->
user
);
code
=
taosProcPutToChildQ
(
pWrapper
->
p
Proc
,
pMsg
,
sizeof
(
SNodeMsg
),
pRpc
->
pCont
,
pRpc
->
contLen
,
pRpc
->
handle
,
code
=
taosProcPutToChildQ
(
pWrapper
->
p
rocObj
,
pMsg
,
sizeof
(
SNodeMsg
),
pRpc
->
pCont
,
pRpc
->
contLen
,
pRpc
->
handle
,
PROC_REQ
);
}
else
{
dTrace
(
"msg:%p, should not processed in child process, handle:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pMsg
->
user
);
...
...
@@ -78,7 +78,7 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS
_OVER:
if
(
code
==
0
)
{
if
(
pWrapper
->
procType
==
PROC_PARENT
)
{
if
(
pWrapper
->
procType
==
DND_
PROC_PARENT
)
{
dTrace
(
"msg:%p, is freed in parent process"
,
pMsg
);
taosFreeQitem
(
pMsg
);
rpcFreeCont
(
pRpc
->
pCont
);
...
...
@@ -105,11 +105,11 @@ _OVER:
}
static
void
dndProcessMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
S
TransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
S
DnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
tmsg_t
msgType
=
pMsg
->
msgType
;
bool
isReq
=
msgType
&
1u
;
SMsgHandle
*
pHandle
=
&
p
Mgmt
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMgmtWrapper
*
pWrapper
=
pHandle
->
pWrapper
;
SMsgHandle
*
pHandle
=
&
p
Trans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMgmtWrapper
*
pWrapper
=
pHandle
->
p
Nd
Wrapper
;
if
(
msgType
==
TDMT_DND_NETWORK_TEST
)
{
dTrace
(
"network test req will be processed, handle:%p, app:%p"
,
pMsg
->
handle
,
pMsg
->
ahandle
);
...
...
@@ -159,7 +159,7 @@ static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static
int32_t
dndInitClient
(
SDnode
*
pDnode
)
{
S
TransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
S
DnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
...
...
@@ -178,8 +178,8 @@ static int32_t dndInitClient(SDnode *pDnode) {
taosEncryptPass_c
((
uint8_t
*
)(
INTERNAL_SECRET
),
strlen
(
INTERNAL_SECRET
),
pass
);
rpcInit
.
secret
=
pass
;
p
Mgmt
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
p
Mgmt
->
clientRpc
==
NULL
)
{
p
Trans
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
p
Trans
->
clientRpc
==
NULL
)
{
dError
(
"failed to init dnode rpc client"
);
return
-
1
;
}
...
...
@@ -189,17 +189,17 @@ static int32_t dndInitClient(SDnode *pDnode) {
}
static
void
dndCleanupClient
(
SDnode
*
pDnode
)
{
S
TransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
if
(
p
Mgmt
->
clientRpc
)
{
rpcClose
(
p
Mgmt
->
clientRpc
);
p
Mgmt
->
clientRpc
=
NULL
;
S
DnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
if
(
p
Trans
->
clientRpc
)
{
rpcClose
(
p
Trans
->
clientRpc
);
p
Trans
->
clientRpc
=
NULL
;
dDebug
(
"dnode rpc client is closed"
);
}
}
static
inline
void
dndSendMsgToMnodeRecv
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
)
{
SEpSet
epSet
=
{
0
};
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
DNODE
];
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
NODE_BEGIN
];
dmGetMnodeEpSet
(
pWrapper
->
pMgmt
,
&
epSet
);
rpcSendRecv
(
pDnode
->
trans
.
clientRpc
,
&
epSet
,
pReq
,
pRsp
);
}
...
...
@@ -263,11 +263,11 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch
}
static
int32_t
dndInitServer
(
SDnode
*
pDnode
)
{
S
TransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
S
DnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
pDnode
->
serverPort
;
rpcInit
.
localPort
=
pDnode
->
data
.
serverPort
;
rpcInit
.
label
=
"DND"
;
rpcInit
.
numOfThreads
=
tsNumOfRpcThreads
;
rpcInit
.
cfp
=
(
RpcCfp
)
dndProcessMsg
;
...
...
@@ -277,8 +277,8 @@ static int32_t dndInitServer(SDnode *pDnode) {
rpcInit
.
afp
=
(
RpcAfp
)
dndRetrieveUserAuthInfo
;
rpcInit
.
parent
=
pDnode
;
p
Mgmt
->
serverRpc
=
rpcOpen
(
&
rpcInit
);
if
(
p
Mgmt
->
serverRpc
==
NULL
)
{
p
Trans
->
serverRpc
=
rpcOpen
(
&
rpcInit
);
if
(
p
Trans
->
serverRpc
==
NULL
)
{
dError
(
"failed to init dnode rpc server"
);
return
-
1
;
}
...
...
@@ -288,10 +288,10 @@ static int32_t dndInitServer(SDnode *pDnode) {
}
static
void
dndCleanupServer
(
SDnode
*
pDnode
)
{
S
TransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
if
(
p
Mgmt
->
serverRpc
)
{
rpcClose
(
p
Mgmt
->
serverRpc
);
p
Mgmt
->
serverRpc
=
NULL
;
S
DnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
if
(
p
Trans
->
serverRpc
)
{
rpcClose
(
p
Trans
->
serverRpc
);
p
Trans
->
serverRpc
=
NULL
;
dDebug
(
"dnode rpc server is closed"
);
}
}
...
...
@@ -308,9 +308,9 @@ void dndCleanupTrans(SDnode *pDnode) {
}
int32_t
dndInitMsgHandle
(
SDnode
*
pDnode
)
{
S
TransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
S
DnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
for
(
EDnd
Type
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
for
(
EDnd
NodeType
n
=
0
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
for
(
int32_t
msgIndex
=
0
;
msgIndex
<
TDMT_MAX
;
++
msgIndex
)
{
...
...
@@ -318,7 +318,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
int8_t
vgId
=
pWrapper
->
msgVgIds
[
msgIndex
];
if
(
msgFp
==
NULL
)
continue
;
SMsgHandle
*
pHandle
=
&
p
Mgmt
->
msgHandles
[
msgIndex
];
SMsgHandle
*
pHandle
=
&
p
Trans
->
msgHandles
[
msgIndex
];
if
(
vgId
==
QNODE_HANDLE
)
{
if
(
pHandle
->
pQndWrapper
!=
NULL
)
{
dError
(
"msg:%s has multiple process nodes"
,
tMsgInfo
[
msgIndex
]);
...
...
@@ -332,11 +332,11 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
}
pHandle
->
pMndWrapper
=
pWrapper
;
}
else
{
if
(
pHandle
->
pWrapper
!=
NULL
)
{
if
(
pHandle
->
p
Nd
Wrapper
!=
NULL
)
{
dError
(
"msg:%s has multiple process nodes"
,
tMsgInfo
[
msgIndex
]);
return
-
1
;
}
pHandle
->
pWrapper
=
pWrapper
;
pHandle
->
p
Nd
Wrapper
=
pWrapper
;
}
}
}
...
...
@@ -344,13 +344,13 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
return
0
;
}
static
int32_t
dndSendRpcReq
(
S
TransMgmt
*
pMgmt
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
if
(
p
Mgmt
->
clientRpc
==
NULL
)
{
static
int32_t
dndSendRpcReq
(
S
DnodeTrans
*
pTrans
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
if
(
p
Trans
->
clientRpc
==
NULL
)
{
terrno
=
TSDB_CODE_NODE_OFFLINE
;
return
-
1
;
}
rpcSendRequest
(
p
Mgmt
->
clientRpc
,
pEpSet
,
pReq
,
NULL
);
rpcSendRequest
(
p
Trans
->
clientRpc
,
pEpSet
,
pReq
,
NULL
);
return
0
;
}
...
...
@@ -369,7 +369,7 @@ static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg
return
-
1
;
}
if
(
pWrapper
->
procType
!=
PROC_CHILD
)
{
if
(
pWrapper
->
procType
!=
DND_
PROC_CHILD
)
{
return
dndSendRpcReq
(
&
pWrapper
->
pDnode
->
trans
,
pEpSet
,
pReq
);
}
else
{
char
*
pHead
=
taosMemoryMalloc
(
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
));
...
...
@@ -380,7 +380,7 @@ static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg
memcpy
(
pHead
,
pReq
,
sizeof
(
SRpcMsg
));
memcpy
(
pHead
+
sizeof
(
SRpcMsg
),
pEpSet
,
sizeof
(
SEpSet
));
taosProcPutToParentQ
(
pWrapper
->
p
Proc
,
pHead
,
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
),
pReq
->
pCont
,
pReq
->
contLen
,
taosProcPutToParentQ
(
pWrapper
->
p
rocObj
,
pHead
,
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
),
pReq
->
pCont
,
pReq
->
contLen
,
PROC_REQ
);
taosMemoryFree
(
pHead
);
return
0
;
...
...
@@ -388,27 +388,27 @@ static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg
}
static
void
dndSendRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
)
{
if
(
pWrapper
->
procType
!=
PROC_CHILD
)
{
if
(
pWrapper
->
procType
!=
DND_
PROC_CHILD
)
{
dndSendRpcRsp
(
pWrapper
,
pRsp
);
}
else
{
taosProcPutToParentQ
(
pWrapper
->
p
Proc
,
pRsp
,
sizeof
(
SRpcMsg
),
pRsp
->
pCont
,
pRsp
->
contLen
,
PROC_RSP
);
taosProcPutToParentQ
(
pWrapper
->
p
rocObj
,
pRsp
,
sizeof
(
SRpcMsg
),
pRsp
->
pCont
,
pRsp
->
contLen
,
PROC_RSP
);
}
}
static
void
dndRegisterBrokenLinkArg
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
)
{
if
(
pWrapper
->
procType
!=
PROC_CHILD
)
{
if
(
pWrapper
->
procType
!=
DND_
PROC_CHILD
)
{
rpcRegisterBrokenLinkArg
(
pMsg
);
}
else
{
taosProcPutToParentQ
(
pWrapper
->
p
Proc
,
pMsg
,
sizeof
(
SRpcMsg
),
pMsg
->
pCont
,
pMsg
->
contLen
,
PROC_REGIST
);
taosProcPutToParentQ
(
pWrapper
->
p
rocObj
,
pMsg
,
sizeof
(
SRpcMsg
),
pMsg
->
pCont
,
pMsg
->
contLen
,
PROC_REGIST
);
}
}
static
void
dndReleaseHandle
(
SMgmtWrapper
*
pWrapper
,
void
*
handle
,
int8_t
type
)
{
if
(
pWrapper
->
procType
!=
PROC_CHILD
)
{
if
(
pWrapper
->
procType
!=
DND_
PROC_CHILD
)
{
rpcReleaseHandle
(
handle
,
type
);
}
else
{
SRpcMsg
msg
=
{.
handle
=
handle
,
.
code
=
type
};
taosProcPutToParentQ
(
pWrapper
->
p
Proc
,
&
msg
,
sizeof
(
SRpcMsg
),
NULL
,
0
,
PROC_RELEASE
);
taosProcPutToParentQ
(
pWrapper
->
p
rocObj
,
&
msg
,
sizeof
(
SRpcMsg
),
NULL
,
0
,
PROC_RELEASE
);
}
}
...
...
@@ -456,7 +456,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
rpcRegisterBrokenLinkArg
(
pMsg
);
break
;
case
PROC_RELEASE
:
taosProcRemoveHandle
(
pWrapper
->
p
Proc
,
pMsg
->
handle
);
taosProcRemoveHandle
(
pWrapper
->
p
rocObj
,
pMsg
->
handle
);
rpcReleaseHandle
(
pMsg
->
handle
,
(
int8_t
)
pMsg
->
code
);
rpcFreeCont
(
pCont
);
break
;
...
...
@@ -464,7 +464,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
dndSendRpcReq
(
&
pWrapper
->
pDnode
->
trans
,
(
SEpSet
*
)((
char
*
)
pMsg
+
sizeof
(
SRpcMsg
)),
pMsg
);
break
;
case
PROC_RSP
:
taosProcRemoveHandle
(
pWrapper
->
p
Proc
,
pMsg
->
handle
);
taosProcRemoveHandle
(
pWrapper
->
p
rocObj
,
pMsg
->
handle
);
dndSendRpcRsp
(
pWrapper
,
pMsg
);
break
;
default:
...
...
@@ -484,7 +484,7 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
s
hm
,
.
shm
=
pWrapper
->
procS
hm
,
.
parent
=
pWrapper
,
.
name
=
pWrapper
->
name
};
return
cfg
;
...
...
source/dnode/mgmt/mm/mmHandle.c
浏览文件 @
4fedc23b
...
...
@@ -56,7 +56,7 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
createReq
.
replica
<=
1
||
createReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
createReq
.
replica
<=
1
||
createReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -75,7 +75,7 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
dropReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
dropReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -95,9 +95,9 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
alterReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
alterReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to alter mnode since %s, dnodeId:%d input:%d"
,
terrstr
(),
pDnode
->
dnodeId
,
alterReq
.
dnodeId
);
dError
(
"failed to alter mnode since %s, dnodeId:%d input:%d"
,
terrstr
(),
pDnode
->
d
ata
.
d
nodeId
,
alterReq
.
dnodeId
);
return
-
1
;
}
else
{
return
mmAlter
(
pMgmt
,
&
alterReq
);
...
...
source/dnode/mgmt/mm/mmInt.c
浏览文件 @
4fedc23b
...
...
@@ -18,9 +18,9 @@
#include "wal.h"
static
bool
mmDeployRequired
(
SDnode
*
pDnode
)
{
if
(
pDnode
->
dnodeId
>
0
)
return
false
;
if
(
pDnode
->
clusterId
>
0
)
return
false
;
if
(
strcmp
(
pDnode
->
localEp
,
pDnode
->
firstEp
)
!=
0
)
return
false
;
if
(
pDnode
->
d
ata
.
d
nodeId
>
0
)
return
false
;
if
(
pDnode
->
data
.
clusterId
>
0
)
return
false
;
if
(
strcmp
(
pDnode
->
data
.
localEp
,
pDnode
->
data
.
firstEp
)
!=
0
)
return
false
;
return
true
;
}
...
...
@@ -53,8 +53,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption
->
selfIndex
=
0
;
SReplica
*
pReplica
=
&
pOption
->
replicas
[
0
];
pReplica
->
id
=
1
;
pReplica
->
port
=
pMgmt
->
pDnode
->
serverPort
;
tstrncpy
(
pReplica
->
fqdn
,
pMgmt
->
pDnode
->
localFqdn
,
TSDB_FQDN_LEN
);
pReplica
->
port
=
pMgmt
->
pDnode
->
data
.
serverPort
;
tstrncpy
(
pReplica
->
fqdn
,
pMgmt
->
pDnode
->
data
.
localFqdn
,
TSDB_FQDN_LEN
);
pOption
->
deploy
=
true
;
pMgmt
->
selfIndex
=
pOption
->
selfIndex
;
...
...
@@ -80,7 +80,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
pReplica
->
id
=
pCreate
->
replicas
[
i
].
id
;
pReplica
->
port
=
pCreate
->
replicas
[
i
].
port
;
memcpy
(
pReplica
->
fqdn
,
pCreate
->
replicas
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
if
(
pReplica
->
id
==
pMgmt
->
pDnode
->
dnodeId
)
{
if
(
pReplica
->
id
==
pMgmt
->
pDnode
->
d
ata
.
d
nodeId
)
{
pOption
->
selfIndex
=
i
;
}
}
...
...
@@ -112,8 +112,8 @@ static int32_t mmOpenImp(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq) {
if
(
!
deployed
)
{
dInfo
(
"mnode start to deploy"
);
if
(
pMgmt
->
pWrapper
->
procType
==
PROC_CHILD
)
{
pMgmt
->
pDnode
->
dnodeId
=
1
;
if
(
pMgmt
->
pWrapper
->
procType
==
DND_
PROC_CHILD
)
{
pMgmt
->
pDnode
->
d
ata
.
d
nodeId
=
1
;
}
mmBuildOptionForDeploy
(
pMgmt
,
&
option
);
}
else
{
...
...
@@ -230,8 +230,8 @@ void mmSetMgmtFp(SMgmtWrapper *pWrapper) {
mgmtFp
.
openFp
=
mmOpen
;
mgmtFp
.
closeFp
=
mmClose
;
mgmtFp
.
startFp
=
mmStart
;
mgmtFp
.
create
Msg
Fp
=
mmProcessCreateReq
;
mgmtFp
.
drop
Msg
Fp
=
mmProcessDropReq
;
mgmtFp
.
createFp
=
mmProcessCreateReq
;
mgmtFp
.
dropFp
=
mmProcessDropReq
;
mgmtFp
.
requiredFp
=
mmRequire
;
mmInitMsgHandle
(
pWrapper
);
...
...
source/dnode/mgmt/qm/qmHandle.c
浏览文件 @
4fedc23b
...
...
@@ -53,7 +53,7 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
createReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
createReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to create qnode since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -72,7 +72,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
dropReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
dropReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to drop qnode since %s"
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/qm/qmInt.c
浏览文件 @
4fedc23b
...
...
@@ -116,8 +116,8 @@ void qmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
qmOpen
;
mgmtFp
.
closeFp
=
qmClose
;
mgmtFp
.
create
Msg
Fp
=
qmProcessCreateReq
;
mgmtFp
.
drop
Msg
Fp
=
qmProcessDropReq
;
mgmtFp
.
createFp
=
qmProcessCreateReq
;
mgmtFp
.
dropFp
=
qmProcessDropReq
;
mgmtFp
.
requiredFp
=
qmRequire
;
qmInitMsgHandle
(
pWrapper
);
...
...
source/dnode/mgmt/sm/smHandle.c
浏览文件 @
4fedc23b
...
...
@@ -53,7 +53,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
createReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
createReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to create snode since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -72,7 +72,7 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return
-
1
;
}
if
(
dropReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
if
(
dropReq
.
dnodeId
!=
pDnode
->
d
ata
.
d
nodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to drop snode since %s"
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/sm/smInt.c
浏览文件 @
4fedc23b
...
...
@@ -113,8 +113,8 @@ void smSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
smOpen
;
mgmtFp
.
closeFp
=
smClose
;
mgmtFp
.
create
Msg
Fp
=
smProcessCreateReq
;
mgmtFp
.
drop
Msg
Fp
=
smProcessDropReq
;
mgmtFp
.
createFp
=
smProcessCreateReq
;
mgmtFp
.
dropFp
=
smProcessDropReq
;
mgmtFp
.
requiredFp
=
smRequire
;
smInitMsgHandle
(
pWrapper
);
...
...
source/dnode/mgmt/vm/vmInt.c
浏览文件 @
4fedc23b
...
...
@@ -278,11 +278,11 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
taosInitRWLatch
(
&
pMgmt
->
latch
);
SDiskCfg
dCfg
=
{
0
};
tstrncpy
(
dCfg
.
dir
,
pDnode
->
dataDir
,
TSDB_FILENAME_LEN
);
tstrncpy
(
dCfg
.
dir
,
pDnode
->
data
.
data
Dir
,
TSDB_FILENAME_LEN
);
dCfg
.
level
=
0
;
dCfg
.
primary
=
1
;
SDiskCfg
*
pDisks
=
pDnode
->
disks
;
int32_t
numOfDisks
=
pDnode
->
numOfDisks
;
SDiskCfg
*
pDisks
=
pDnode
->
d
ata
.
d
isks
;
int32_t
numOfDisks
=
pDnode
->
data
.
numOfDisks
;
if
(
numOfDisks
<=
0
||
pDisks
==
NULL
)
{
pDisks
=
&
dCfg
;
numOfDisks
=
1
;
...
...
@@ -329,7 +329,7 @@ _OVER:
static
int32_t
vmRequire
(
SMgmtWrapper
*
pWrapper
,
bool
*
required
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
*
required
=
pDnode
->
numOfS
upportVnodes
>
0
;
*
required
=
pDnode
->
data
.
s
upportVnodes
>
0
;
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
4fedc23b
...
...
@@ -354,7 +354,7 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) {
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
bool
dnodeChanged
=
(
statusReq
.
d
v
er
!=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
));
bool
dnodeChanged
=
(
statusReq
.
d
nodeV
er
!=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
));
bool
reboot
=
(
pDnode
->
rebootTime
!=
statusReq
.
rebootTime
);
bool
needCheck
=
!
online
||
dnodeChanged
||
reboot
;
...
...
@@ -405,7 +405,7 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) {
pDnode
->
numOfSupportVnodes
=
statusReq
.
numOfSupportVnodes
;
SStatusRsp
statusRsp
=
{
0
};
statusRsp
.
d
v
er
=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
);
statusRsp
.
d
nodeV
er
=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
);
statusRsp
.
dnodeCfg
.
dnodeId
=
pDnode
->
id
;
statusRsp
.
dnodeCfg
.
clusterId
=
pMnode
->
clusterId
;
statusRsp
.
pDnodeEps
=
taosArrayInit
(
mndGetDnodeSize
(
pMnode
),
sizeof
(
SDnodeEp
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录