Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
766039a7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
766039a7
编写于
12月 23, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-10431 minor changes
上级
d96963a2
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
334 addition
and
76 deletion
+334
-76
source/dnode/mgmt/impl/test/CMakeLists.txt
source/dnode/mgmt/impl/test/CMakeLists.txt
+1
-1
source/dnode/mgmt/impl/test/mnode/CMakeLists.txt
source/dnode/mgmt/impl/test/mnode/CMakeLists.txt
+11
-0
source/dnode/mgmt/impl/test/mnode/mnode.cpp
source/dnode/mgmt/impl/test/mnode/mnode.cpp
+260
-0
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+62
-75
未找到文件。
source/dnode/mgmt/impl/test/CMakeLists.txt
浏览文件 @
766039a7
...
...
@@ -7,7 +7,7 @@ add_subdirectory(cluster)
add_subdirectory
(
db
)
add_subdirectory
(
dnode
)
# add_subdirectory(func)
#
add_subdirectory(mnode)
add_subdirectory
(
mnode
)
add_subdirectory
(
profile
)
add_subdirectory
(
show
)
add_subdirectory
(
stb
)
...
...
source/dnode/mgmt/impl/test/mnode/CMakeLists.txt
0 → 100644
浏览文件 @
766039a7
aux_source_directory
(
. MTEST_SRC
)
add_executable
(
dnode_test_mnode
${
MTEST_SRC
}
)
target_link_libraries
(
dnode_test_mnode
PUBLIC sut
)
add_test
(
NAME dnode_test_mnode
COMMAND dnode_test_mnode
)
source/dnode/mgmt/impl/test/mnode/mnode.cpp
0 → 100644
浏览文件 @
766039a7
/**
* @file dnode.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module dnode-msg tests
* @version 0.1
* @date 2021-12-15
*
* @copyright Copyright (c) 2021
*
*/
#include "base.h"
class
DndTestMnode
:
public
::
testing
::
Test
{
public:
void
SetUp
()
override
{}
void
TearDown
()
override
{}
public:
static
void
SetUpTestSuite
()
{
test
.
Init
(
"/tmp/dnode_test_mnode1"
,
9061
);
const
char
*
fqdn
=
"localhost"
;
const
char
*
firstEp
=
"localhost:9061"
;
server2
.
Start
(
"/tmp/dnode_test_mnode2"
,
fqdn
,
9062
,
firstEp
);
server3
.
Start
(
"/tmp/dnode_test_mnode3"
,
fqdn
,
9063
,
firstEp
);
server4
.
Start
(
"/tmp/dnode_test_mnode4"
,
fqdn
,
9064
,
firstEp
);
server5
.
Start
(
"/tmp/dnode_test_mnode5"
,
fqdn
,
9065
,
firstEp
);
taosMsleep
(
300
);
}
static
void
TearDownTestSuite
()
{
server2
.
Stop
();
server3
.
Stop
();
server4
.
Stop
();
server5
.
Stop
();
test
.
Cleanup
();
}
static
Testbase
test
;
static
TestServer
server2
;
static
TestServer
server3
;
static
TestServer
server4
;
static
TestServer
server5
;
};
Testbase
DndTestMnode
::
test
;
TestServer
DndTestMnode
::
server2
;
TestServer
DndTestMnode
::
server3
;
TestServer
DndTestMnode
::
server4
;
TestServer
DndTestMnode
::
server5
;
TEST_F
(
DndTestMnode
,
01
_ShowDnode
)
{
test
.
SendShowMetaMsg
(
TSDB_MGMT_TABLE_DNODE
,
""
);
CHECK_META
(
"show dnodes"
,
7
);
CHECK_SCHEMA
(
0
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"id"
);
CHECK_SCHEMA
(
1
,
TSDB_DATA_TYPE_BINARY
,
TSDB_EP_LEN
+
VARSTR_HEADER_SIZE
,
"endpoint"
);
CHECK_SCHEMA
(
2
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"vnodes"
);
CHECK_SCHEMA
(
3
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"max_vnodes"
);
CHECK_SCHEMA
(
4
,
TSDB_DATA_TYPE_BINARY
,
10
+
VARSTR_HEADER_SIZE
,
"status"
);
CHECK_SCHEMA
(
5
,
TSDB_DATA_TYPE_TIMESTAMP
,
8
,
"create_time"
);
CHECK_SCHEMA
(
6
,
TSDB_DATA_TYPE_BINARY
,
24
+
VARSTR_HEADER_SIZE
,
"offline_reason"
);
test
.
SendShowRetrieveMsg
();
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
CheckInt16
(
1
);
CheckBinary
(
"localhost:9061"
,
TSDB_EP_LEN
);
CheckInt16
(
0
);
CheckInt16
(
1
);
CheckBinary
(
"ready"
,
10
);
CheckTimestamp
();
CheckBinary
(
""
,
24
);
}
#if 0
TEST_F(DndTestMnode, 02_ConfigDnode) {
int32_t contLen = sizeof(SCfgDnodeMsg);
SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
strcpy(pReq->config, "ddebugflag 131");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CONFIG_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
TEST_F(DndTestMnode, 03_Create_Drop_Restart_Dnode) {
{
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9062");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
taosMsleep(1300);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
CheckInt16(1);
CheckInt16(2);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("localhost:9062", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
{
int32_t contLen = sizeof(SDropDnodeMsg);
SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(1);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
{
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9063");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
{
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9064");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
{
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9065");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
taosMsleep(1300);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
CheckInt16(5);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("localhost:9063", TSDB_EP_LEN);
CheckBinary("localhost:9064", TSDB_EP_LEN);
CheckBinary("localhost:9065", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
// restart
uInfo("stop all server");
test.Restart();
server2.Restart();
server3.Restart();
server4.Restart();
server5.Restart();
taosMsleep(1300);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
CheckInt16(5);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("localhost:9063", TSDB_EP_LEN);
CheckBinary("localhost:9064", TSDB_EP_LEN);
CheckBinary("localhost:9065", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
}
#endif
\ No newline at end of file
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
766039a7
...
...
@@ -23,13 +23,13 @@
#define TSDB_MNODE_RESERVE_SIZE 64
static
int32_t
mndCreateDefaultMnode
(
SMnode
*
pMnode
);
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
p
Mnode
Obj
);
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
pObj
);
static
SSdbRow
*
mndMnodeActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndMnodeActionInsert
(
SSdb
*
pSdb
,
SMnodeObj
*
p
Mnode
Obj
);
static
int32_t
mndMnodeActionDelete
(
SSdb
*
pSdb
,
SMnodeObj
*
p
Mnode
Obj
);
static
int32_t
mndMnodeActionInsert
(
SSdb
*
pSdb
,
SMnodeObj
*
pObj
);
static
int32_t
mndMnodeActionDelete
(
SSdb
*
pSdb
,
SMnodeObj
*
pObj
);
static
int32_t
mndMnodeActionUpdate
(
SSdb
*
pSdb
,
SMnodeObj
*
pOldMnode
,
SMnodeObj
*
pNewMnode
);
static
int32_t
mndProcessCreateMnode
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropMnode
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateMnode
Req
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropMnode
Req
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateMnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropMnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetMnodeMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
...
...
@@ -46,8 +46,8 @@ int32_t mndInitMnode(SMnode *pMnode) {
.
updateFp
=
(
SdbUpdateFp
)
mndMnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndMnodeActionDelete
};
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_MNODE
,
mndProcessCreateMnode
Msg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_MNODE
,
mndProcessDropMnode
Msg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_MNODE
,
mndProcessCreateMnode
Req
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_MNODE
,
mndProcessDropMnode
Req
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP
,
mndProcessCreateMnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_MNODE_IN_RSP
,
mndProcessDropMnodeRsp
);
...
...
@@ -69,9 +69,9 @@ static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
return
pObj
;
}
static
void
mndReleaseMnode
(
SMnode
*
pMnode
,
SMnodeObj
*
p
Mnode
Obj
)
{
static
void
mndReleaseMnode
(
SMnode
*
pMnode
,
SMnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
p
Mnode
Obj
);
sdbRelease
(
pSdb
,
pObj
);
}
char
*
mndGetRoleStr
(
int32_t
showType
)
{
...
...
@@ -101,14 +101,14 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
return
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
}
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
p
Mnode
Obj
)
{
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
pObj
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_MNODE
,
TSDB_MNODE_VER_NUMBER
,
sizeof
(
SMnodeObj
)
+
TSDB_MNODE_RESERVE_SIZE
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
p
Mnode
Obj
->
id
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
p
Mnode
Obj
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
p
Mnode
Obj
->
updateTime
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pObj
->
id
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pObj
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pObj
->
updateTime
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_MNODE_RESERVE_SIZE
)
return
pRaw
;
...
...
@@ -125,42 +125,38 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SMnodeObj
));
SMnodeObj
*
p
Mnode
Obj
=
sdbGetRowObj
(
pRow
);
if
(
p
Mnode
Obj
==
NULL
)
return
NULL
;
SMnodeObj
*
pObj
=
sdbGetRowObj
(
pRow
);
if
(
pObj
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
p
Mnode
Obj
->
id
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
p
Mnode
Obj
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
p
Mnode
Obj
->
updateTime
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
id
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
updateTime
)
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_MNODE_RESERVE_SIZE
)
return
pRow
;
}
static
void
mnodeResetMnode
(
SMnodeObj
*
pMnodeObj
)
{
pMnodeObj
->
role
=
TAOS_SYNC_STATE_FOLLOWER
;
pMnodeObj
->
roleTerm
=
0
;
pMnodeObj
->
roleTime
=
0
;
}
static
void
mnodeResetMnode
(
SMnodeObj
*
pObj
)
{
pObj
->
role
=
TAOS_SYNC_STATE_FOLLOWER
;
}
static
int32_t
mndMnodeActionInsert
(
SSdb
*
pSdb
,
SMnodeObj
*
p
Mnode
Obj
)
{
mTrace
(
"mnode:%d, perform insert action"
,
p
Mnode
Obj
->
id
);
p
MnodeObj
->
pDnode
=
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
pMnode
Obj
->
id
);
if
(
p
Mnode
Obj
->
pDnode
==
NULL
)
{
static
int32_t
mndMnodeActionInsert
(
SSdb
*
pSdb
,
SMnodeObj
*
pObj
)
{
mTrace
(
"mnode:%d, perform insert action"
,
pObj
->
id
);
p
Obj
->
pDnode
=
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
p
Obj
->
id
);
if
(
pObj
->
pDnode
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
mError
(
"mnode:%d, failed to perform insert action since %s"
,
p
Mnode
Obj
->
id
,
terrstr
());
mError
(
"mnode:%d, failed to perform insert action since %s"
,
pObj
->
id
,
terrstr
());
return
-
1
;
}
mnodeResetMnode
(
p
Mnode
Obj
);
mnodeResetMnode
(
pObj
);
return
0
;
}
static
int32_t
mndMnodeActionDelete
(
SSdb
*
pSdb
,
SMnodeObj
*
p
Mnode
Obj
)
{
mTrace
(
"mnode:%d, perform delete action"
,
p
Mnode
Obj
->
id
);
if
(
p
Mnode
Obj
->
pDnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
p
Mnode
Obj
->
pDnode
);
p
Mnode
Obj
->
pDnode
=
NULL
;
static
int32_t
mndMnodeActionDelete
(
SSdb
*
pSdb
,
SMnodeObj
*
pObj
)
{
mTrace
(
"mnode:%d, perform delete action"
,
pObj
->
id
);
if
(
pObj
->
pDnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
pObj
->
pDnode
);
pObj
->
pDnode
=
NULL
;
}
return
0
;
...
...
@@ -168,8 +164,6 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) {
static
int32_t
mndMnodeActionUpdate
(
SSdb
*
pSdb
,
SMnodeObj
*
pOldMnode
,
SMnodeObj
*
pNewMnode
)
{
mTrace
(
"mnode:%d, perform update action"
,
pOldMnode
->
id
);
pOldMnode
->
id
=
pNewMnode
->
id
;
pOldMnode
->
createdTime
=
pNewMnode
->
createdTime
;
pOldMnode
->
updateTime
=
pNewMnode
->
updateTime
;
return
0
;
}
...
...
@@ -177,12 +171,12 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj
bool
mndIsMnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SMnodeObj
*
p
Mnode
Obj
=
sdbAcquire
(
pSdb
,
SDB_MNODE
,
&
dnodeId
);
if
(
p
Mnode
Obj
==
NULL
)
{
SMnodeObj
*
pObj
=
sdbAcquire
(
pSdb
,
SDB_MNODE
,
&
dnodeId
);
if
(
pObj
==
NULL
)
{
return
false
;
}
sdbRelease
(
pSdb
,
p
Mnode
Obj
);
sdbRelease
(
pSdb
,
pObj
);
return
true
;
}
...
...
@@ -193,14 +187,14 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
void
*
pIter
=
NULL
;
while
(
1
)
{
SMnodeObj
*
p
Mnode
Obj
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
p
Mnode
Obj
);
SMnodeObj
*
pObj
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pObj
);
if
(
pIter
==
NULL
)
break
;
if
(
p
Mnode
Obj
->
pDnode
==
NULL
)
break
;
if
(
pObj
->
pDnode
==
NULL
)
break
;
pEpSet
->
port
[
pEpSet
->
numOfEps
]
=
htons
(
p
Mnode
Obj
->
pDnode
->
port
);
tstrncpy
(
pEpSet
->
fqdn
[
pEpSet
->
numOfEps
],
pMnode
Obj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
p
Mnode
Obj
->
role
==
TAOS_SYNC_STATE_LEADER
)
{
pEpSet
->
port
[
pEpSet
->
numOfEps
]
=
htons
(
pObj
->
pDnode
->
port
);
memcpy
(
pEpSet
->
fqdn
[
pEpSet
->
numOfEps
],
p
Obj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
pObj
->
role
==
TAOS_SYNC_STATE_LEADER
)
{
pEpSet
->
inUse
=
pEpSet
->
numOfEps
;
}
...
...
@@ -210,7 +204,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
static
int32_t
mndCreateMnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SCreateMnodeMsg
*
pCreate
)
{
SMnodeObj
mnodeObj
=
{
0
};
mnodeObj
.
id
=
1
;
// todo
mnodeObj
.
id
=
sdbGetMaxId
(
pMnode
->
pSdb
,
SDB_MNODE
);
mnodeObj
.
createdTime
=
taosGetTimestampMs
();
mnodeObj
.
updateTime
=
mnodeObj
.
createdTime
;
...
...
@@ -255,7 +249,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *
return
0
;
}
static
int32_t
mndProcessCreateMnode
Msg
(
SMnodeMsg
*
pMsg
)
{
static
int32_t
mndProcessCreateMnode
Req
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SCreateMnodeMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
...
...
@@ -271,9 +265,9 @@ static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) {
}
mndReleaseDnode
(
pMnode
,
pDnode
);
SMnodeObj
*
p
Mnode
Obj
=
mndAcquireMnode
(
pMnode
,
pCreate
->
dnodeId
);
if
(
p
Mnode
Obj
!=
NULL
)
{
mError
(
"mnode:%d, mnode already exist"
,
p
Mnode
Obj
->
id
);
SMnodeObj
*
pObj
=
mndAcquireMnode
(
pMnode
,
pCreate
->
dnodeId
);
if
(
pObj
!=
NULL
)
{
mError
(
"mnode:%d, mnode already exist"
,
pObj
->
id
);
terrno
=
TSDB_CODE_MND_MNODE_ALREADY_EXIST
;
return
-
1
;
}
...
...
@@ -288,15 +282,15 @@ static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) {
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndDropMnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SMnodeObj
*
p
Mnode
Obj
)
{
static
int32_t
mndDropMnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SMnodeObj
*
pObj
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
if
(
pTrans
==
NULL
)
{
mError
(
"mnode:%d, failed to drop since %s"
,
p
Mnode
Obj
->
id
,
terrstr
());
mError
(
"mnode:%d, failed to drop since %s"
,
pObj
->
id
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to drop user:%d"
,
pTrans
->
id
,
p
Mnode
Obj
->
id
);
mDebug
(
"trans:%d, used to drop user:%d"
,
pTrans
->
id
,
pObj
->
id
);
SSdbRaw
*
pRedoRaw
=
mndMnodeActionEncode
(
p
Mnode
Obj
);
SSdbRaw
*
pRedoRaw
=
mndMnodeActionEncode
(
pObj
);
if
(
pRedoRaw
==
NULL
||
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append redo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
...
...
@@ -304,7 +298,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeOb
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPING
);
SSdbRaw
*
pUndoRaw
=
mndMnodeActionEncode
(
p
Mnode
Obj
);
SSdbRaw
*
pUndoRaw
=
mndMnodeActionEncode
(
pObj
);
if
(
pUndoRaw
==
NULL
||
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append undo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
...
...
@@ -312,7 +306,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeOb
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
);
SSdbRaw
*
pCommitRaw
=
mndMnodeActionEncode
(
p
Mnode
Obj
);
SSdbRaw
*
pCommitRaw
=
mndMnodeActionEncode
(
pObj
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
...
...
@@ -330,7 +324,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeOb
return
0
;
}
static
int32_t
mndProcessDropMnode
Msg
(
SMnodeMsg
*
pMsg
)
{
static
int32_t
mndProcessDropMnode
Req
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SDropMnodeMsg
*
pDrop
=
pMsg
->
rpcMsg
.
pCont
;
pDrop
->
dnodeId
=
htonl
(
pDrop
->
dnodeId
);
...
...
@@ -343,14 +337,14 @@ static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) {
return
-
1
;
}
SMnodeObj
*
p
Mnode
Obj
=
mndAcquireMnode
(
pMnode
,
pDrop
->
dnodeId
);
if
(
p
Mnode
Obj
==
NULL
)
{
SMnodeObj
*
pObj
=
mndAcquireMnode
(
pMnode
,
pDrop
->
dnodeId
);
if
(
pObj
==
NULL
)
{
mError
(
"mnode:%d, not exist"
,
pDrop
->
dnodeId
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
}
int32_t
code
=
mndDropMnode
(
pMnode
,
pMsg
,
p
Mnode
Obj
);
int32_t
code
=
mndDropMnode
(
pMnode
,
pMsg
,
pObj
);
if
(
code
!=
0
)
{
mError
(
"mnode:%d, failed to drop since %s"
,
pMnode
->
dnodeId
,
terrstr
());
...
...
@@ -422,46 +416,39 @@ static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
SMnodeObj
*
p
Mnode
Obj
=
NULL
;
SMnodeObj
*
pObj
=
NULL
;
char
*
pWrite
;
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pShow
->
pIter
,
(
void
**
)
&
p
Mnode
Obj
);
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pShow
->
pIter
,
(
void
**
)
&
pObj
);
if
(
pShow
->
pIter
==
NULL
)
break
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
p
Mnode
Obj
->
id
;
*
(
int16_t
*
)
pWrite
=
pObj
->
id
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pMnodeObj
->
id
);
if
(
pDnode
!=
NULL
)
{
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pDnode
->
ep
,
pShow
->
bytes
[
cols
]);
}
else
{
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
"invalid ep"
,
pShow
->
bytes
[
cols
]);
}
mndReleaseDnode
(
pMnode
,
pDnode
);
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pObj
->
pDnode
->
ep
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
char
*
roles
=
mndGetRoleStr
(
p
Mnode
Obj
->
role
);
char
*
roles
=
mndGetRoleStr
(
pObj
->
role
);
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
roles
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
p
Mnode
Obj
->
roleTime
;
*
(
int64_t
*
)
pWrite
=
pObj
->
roleTime
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
p
Mnode
Obj
->
createdTime
;
*
(
int64_t
*
)
pWrite
=
pObj
->
createdTime
;
cols
++
;
numOfRows
++
;
sdbRelease
(
pSdb
,
p
Mnode
Obj
);
sdbRelease
(
pSdb
,
pObj
);
}
mndVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录