Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
40dbc1cd
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
40dbc1cd
编写于
12月 01, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
xMerge remote-tracking branch 'origin/feature/dnode3' into 3.0
上级
e1234672
bb887daf
变更
22
隐藏空白更改
内联
并排
Showing
22 changed file
with
646 addition
and
258 deletion
+646
-258
include/common/taosmsg.h
include/common/taosmsg.h
+8
-3
include/dnode/mgmt/dnode.h
include/dnode/mgmt/dnode.h
+18
-88
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+15
-112
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+28
-1
source/dnode/mgmt/daemon/src/daemon.c
source/dnode/mgmt/daemon/src/daemon.c
+5
-1
source/dnode/mgmt/impl/src/dndDnode.c
source/dnode/mgmt/impl/src/dndDnode.c
+11
-5
source/dnode/mgmt/impl/src/dndMnode.c
source/dnode/mgmt/impl/src/dndMnode.c
+6
-0
source/dnode/mnode/impl/CMakeLists.txt
source/dnode/mnode/impl/CMakeLists.txt
+1
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+43
-17
source/dnode/mnode/impl/inc/mndDnode.h
source/dnode/mnode/impl/inc/mndDnode.h
+4
-2
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+6
-0
source/dnode/mnode/impl/inc/mndMnode.h
source/dnode/mnode/impl/inc/mndMnode.h
+1
-2
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+3
-3
source/dnode/mnode/impl/src/mndAcct.c
source/dnode/mnode/impl/src/mndAcct.c
+2
-2
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+326
-4
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+115
-6
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+6
-6
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+13
-3
source/dnode/mnode/sdb/inc/sdbInt.h
source/dnode/mnode/sdb/inc/sdbInt.h
+2
-1
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+2
-1
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+1
-1
source/dnode/mnode/sdb/src/sdbRaw.c
source/dnode/mnode/sdb/src/sdbRaw.c
+30
-0
未找到文件。
include/common/taosmsg.h
浏览文件 @
40dbc1cd
...
...
@@ -63,6 +63,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_USER, "drop-user" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_DNODE
,
"create-dnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CONFIG_DNODE
,
"config-dnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_DNODE
,
"drop-dnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_MNODE
,
"create-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_MNODE
,
"drop-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_DB
,
"create-db"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_DB
,
"drop-db"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_USE_DB
,
"use-db"
)
...
...
@@ -631,7 +633,7 @@ typedef struct {
typedef
struct
{
int32_t
statusInterval
;
int
8_t
reserved
[
4
]
;
int
32_t
mnodeEqualVnodeNum
;
int64_t
checkTime
;
// 1970-01-01 00:00:00.000
char
timezone
[
TSDB_TIMEZONE_LEN
];
// tsTimezone
char
locale
[
TSDB_LOCALE_LEN
];
// tsLocale
...
...
@@ -654,11 +656,14 @@ typedef struct {
}
SVnodeLoads
;
typedef
struct
SStatusMsg
{
uint32_t
sversion
;
int32_t
sver
;
int32_t
dnodeId
;
int64_t
clusterId
;
uint32_t
rebootTime
;
// time stamp for last reboot
int32_t
numOfCores
;
int16_t
numOfCores
;
int16_t
numOfSupportMnodes
;
int16_t
numOfSupportVnodes
;
int16_t
numOfSupportQnodes
;
char
dnodeEp
[
TSDB_EP_LEN
];
SClusterCfg
clusterCfg
;
SVnodeLoads
vnodeLoads
;
...
...
include/dnode/mgmt/dnode.h
浏览文件 @
40dbc1cd
...
...
@@ -26,95 +26,25 @@ extern "C" {
typedef
struct
SDnode
SDnode
;
typedef
struct
{
/**
* @brief software version of the program.
*
*/
int32_t
sver
;
/**
* @brief num of CPU cores.
*
*/
int32_t
numOfCores
;
/**
* @brief number of threads per CPU core.
*
*/
float
numOfThreadsPerCore
;
/**
* @brief the proportion of total CPU cores available for query processing.
*
*/
float
ratioOfQueryCores
;
/**
* @brief max number of connections allowed in dnode.
*
*/
int32_t
maxShellConns
;
/**
* @brief time interval of heart beat from shell to dnode, seconds.
*
*/
int32_t
shellActivityTimer
;
/**
* @brief time interval of dnode status reporting to mnode, seconds, for cluster only.
*
*/
int32_t
statusInterval
;
/**
* @brief first port number for the connection (12 continuous UDP/TCP port number are used).
*
*/
int32_t
sver
;
int16_t
numOfCores
;
int16_t
numOfSupportMnodes
;
int16_t
numOfSupportVnodes
;
int16_t
numOfSupportQnodes
;
int32_t
statusInterval
;
int32_t
mnodeEqualVnodeNum
;
float
numOfThreadsPerCore
;
float
ratioOfQueryCores
;
int32_t
maxShellConns
;
int32_t
shellActivityTimer
;
uint16_t
serverPort
;
/**
* @brief data file's directory.
*
*/
char
dataDir
[
TSDB_FILENAME_LEN
];
/**
* @brief local endpoint.
*
*/
char
localEp
[
TSDB_EP_LEN
];
/**
* @brieflocal fully qualified domain name (FQDN).
*
*/
char
localFqdn
[
TSDB_FQDN_LEN
];
/**
* @brief first fully qualified domain name (FQDN) for TDengine system.
*
*/
char
firstEp
[
TSDB_EP_LEN
];
/**
* @brief system time zone.
*
*/
char
timezone
[
TSDB_TIMEZONE_LEN
];
/**
* @brief system locale.
*
*/
char
locale
[
TSDB_LOCALE_LEN
];
/**
* @briefdefault system charset.
*
*/
char
charset
[
TSDB_LOCALE_LEN
];
char
dataDir
[
TSDB_FILENAME_LEN
];
char
localEp
[
TSDB_EP_LEN
];
char
localFqdn
[
TSDB_FQDN_LEN
];
char
firstEp
[
TSDB_EP_LEN
];
char
timezone
[
TSDB_TIMEZONE_LEN
];
char
locale
[
TSDB_LOCALE_LEN
];
char
charset
[
TSDB_LOCALE_LEN
];
}
SDnodeOpt
;
/* ------------------------ SDnode ------------------------ */
...
...
include/dnode/mnode/mnode.h
浏览文件 @
40dbc1cd
...
...
@@ -30,133 +30,36 @@ typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef
int32_t
(
*
PutMsgToMnodeQFp
)(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
typedef
struct
SMnodeLoad
{
/**
* @brief the number of dnodes in cluster.
*
*/
int64_t
numOfDnode
;
/**
* @brief the number of mnodes in cluster.
*
*/
int64_t
numOfMnode
;
/**
* @brief the number of vgroups in cluster.
*
*/
int64_t
numOfVgroup
;
/**
* @brief the number of databases in cluster.
*
*/
int64_t
numOfDatabase
;
/**
* @brief the number of super tables in cluster.
*
*/
int64_t
numOfSuperTable
;
/**
* @brief the number of child tables in cluster.
*
*/
int64_t
numOfChildTable
;
/**
* @brief the number of normal tables in cluster.
*
*/
int64_t
numOfNormalTable
;
/**
* @brief the number of numOfTimeseries in cluster.
*
*/
int64_t
numOfColumn
;
/**
* @brief total points written in cluster.
*
*/
int64_t
totalPoints
;
/**
* @brief total storage in cluster.
*
*/
int64_t
totalStorage
;
/**
* @brief total compressed storage in cluster.
*
*/
int64_t
compStorage
;
}
SMnodeLoad
;
typedef
struct
{
/**
* @brief dnodeId of this mnode.
*
*/
int32_t
dnodeId
;
/**
* @brief clusterId of this mnode.
*
*/
int64_t
clusterId
;
/**
* @brief replica num of this mnode.
*
*/
int8_t
replica
;
/**
* @brief self index in the array of replicas.
*
*/
int8_t
selfIndex
;
/**
* @brief detail replica information of this mnode.
*
*/
SReplica
replicas
[
TSDB_MAX_REPLICA
];
/**
* @brief the parent dnode of this mnode.
*
*/
SDnode
*
pDnode
;
/**
* @brief put apply msg to the write queue in dnode.
*
*/
PutMsgToMnodeQFp
putMsgToApplyMsgFp
;
/**
* @brief the callback function while send msg to dnode.
*
*/
SendMsgToDnodeFp
sendMsgToDnodeFp
;
/**
* @brief the callback function while send msg to mnode.
*
*/
SendMsgToMnodeFp
sendMsgToMnodeFp
;
/**
* @brief the callback function while send redirect msg to clients or peers.
*
*/
int32_t
dnodeId
;
int64_t
clusterId
;
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
SDnode
*
pDnode
;
PutMsgToMnodeQFp
putMsgToApplyMsgFp
;
SendMsgToDnodeFp
sendMsgToDnodeFp
;
SendMsgToMnodeFp
sendMsgToMnodeFp
;
SendRedirectMsgFp
sendRedirectMsgFp
;
int32_t
sver
;
int32_t
statusInterval
;
int32_t
mnodeEqualVnodeNum
;
char
*
timezone
;
char
*
locale
;
char
*
charset
;
}
SMnodeOpt
;
/* ------------------------ SMnode ------------------------ */
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
40dbc1cd
...
...
@@ -38,6 +38,15 @@ extern "C" {
dataPos += sizeof(int32_t); \
}
#define SDB_GET_INT16(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt16(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \
return NULL; \
} \
dataPos += sizeof(int16_t); \
}
#define SDB_GET_INT8(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \
...
...
@@ -74,6 +83,15 @@ extern "C" {
dataPos += sizeof(int32_t); \
}
#define SDB_SET_INT16(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt16(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
} \
dataPos += sizeof(int16_t); \
}
#define SDB_SET_INT8(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \
...
...
@@ -100,6 +118,7 @@ extern "C" {
} \
}
typedef
struct
SMnode
SMnode
;
typedef
struct
SSdbRaw
SSdbRaw
;
typedef
struct
SSdbRow
SSdbRow
;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
}
EKeyType
;
...
...
@@ -130,7 +149,7 @@ typedef struct SSdb SSdb;
typedef
int32_t
(
*
SdbInsertFp
)(
SSdb
*
pSdb
,
void
*
pObj
);
typedef
int32_t
(
*
SdbUpdateFp
)(
SSdb
*
pSdb
,
void
*
pSrcObj
,
void
*
pDstObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
SSdb
*
pSdb
,
void
*
pObj
);
typedef
int32_t
(
*
SdbDeployFp
)(
S
Sdb
*
pSdb
);
typedef
int32_t
(
*
SdbDeployFp
)(
S
Mnode
*
pMnode
);
typedef
SSdbRow
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
...
...
@@ -190,6 +209,12 @@ typedef struct SSdbOpt {
*
*/
const
char
*
path
;
/**
* @brief The mnode object.
*
*/
SMnode
*
pMnode
;
}
SSdbOpt
;
/**
...
...
@@ -291,12 +316,14 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
SSdbRaw
*
sdbAllocRaw
(
ESdbType
type
,
int8_t
sver
,
int32_t
dataLen
);
void
sdbFreeRaw
(
SSdbRaw
*
pRaw
);
int32_t
sdbSetRawInt8
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int8_t
val
);
int32_t
sdbSetRawInt16
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int16_t
val
);
int32_t
sdbSetRawInt32
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int32_t
val
);
int32_t
sdbSetRawInt64
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int64_t
val
);
int32_t
sdbSetRawBinary
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
const
char
*
pVal
,
int32_t
valLen
);
int32_t
sdbSetRawDataLen
(
SSdbRaw
*
pRaw
,
int32_t
dataLen
);
int32_t
sdbSetRawStatus
(
SSdbRaw
*
pRaw
,
ESdbStatus
status
);
int32_t
sdbGetRawInt8
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int8_t
*
val
);
int32_t
sdbGetRawInt16
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int16_t
*
val
);
int32_t
sdbGetRawInt32
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int32_t
*
val
);
int32_t
sdbGetRawInt64
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int64_t
*
val
);
int32_t
sdbGetRawBinary
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
char
*
pVal
,
int32_t
valLen
);
...
...
source/dnode/mgmt/daemon/src/daemon.c
浏览文件 @
40dbc1cd
...
...
@@ -138,11 +138,15 @@ void dmnWaitSignal() {
void
dmnInitOption
(
SDnodeOpt
*
pOption
)
{
pOption
->
sver
=
tsVersion
;
pOption
->
numOfCores
=
tsNumOfCores
;
pOption
->
numOfSupportMnodes
=
1
;
pOption
->
numOfSupportVnodes
=
1
;
pOption
->
numOfSupportQnodes
=
1
;
pOption
->
statusInterval
=
tsStatusInterval
;
pOption
->
mnodeEqualVnodeNum
=
tsMnodeEqualVnodeNum
;
pOption
->
numOfThreadsPerCore
=
tsNumOfThreadsPerCore
;
pOption
->
ratioOfQueryCores
=
tsRatioOfQueryCores
;
pOption
->
maxShellConns
=
tsMaxShellConns
;
pOption
->
shellActivityTimer
=
tsShellActivityTimer
;
pOption
->
statusInterval
=
tsStatusInterval
;
pOption
->
serverPort
=
tsServerPort
;
tstrncpy
(
pOption
->
dataDir
,
tsDataDir
,
TSDB_FILENAME_LEN
);
tstrncpy
(
pOption
->
localEp
,
tsLocalEp
,
TSDB_EP_LEN
);
...
...
source/dnode/mgmt/impl/src/dndDnode.c
浏览文件 @
40dbc1cd
...
...
@@ -348,19 +348,25 @@ static void dndSendStatusMsg(SDnode *pDnode) {
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
taosRLockLatch
(
&
pMgmt
->
latch
);
pStatus
->
sver
sion
=
htonl
(
pDnode
->
opt
.
sver
);
pStatus
->
sver
=
htonl
(
pDnode
->
opt
.
sver
);
pStatus
->
dnodeId
=
htonl
(
pMgmt
->
dnodeId
);
pStatus
->
clusterId
=
htobe64
(
pMgmt
->
clusterId
);
pStatus
->
rebootTime
=
htonl
(
pMgmt
->
rebootTime
);
pStatus
->
numOfCores
=
htonl
(
pDnode
->
opt
.
numOfCores
);
pStatus
->
numOfCores
=
htons
(
pDnode
->
opt
.
numOfCores
);
pStatus
->
numOfSupportMnodes
=
htons
(
pDnode
->
opt
.
numOfCores
);
pStatus
->
numOfSupportVnodes
=
htons
(
pDnode
->
opt
.
numOfCores
);
pStatus
->
numOfSupportQnodes
=
htons
(
pDnode
->
opt
.
numOfCores
);
tstrncpy
(
pStatus
->
dnodeEp
,
pDnode
->
opt
.
localEp
,
TSDB_EP_LEN
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
pDnode
->
opt
.
statusInterval
);
tstrncpy
(
pStatus
->
clusterCfg
.
timezone
,
pDnode
->
opt
.
timezone
,
TSDB_TIMEZONE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
locale
,
pDnode
->
opt
.
locale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
pDnode
->
opt
.
charset
,
TSDB_LOCALE_LEN
);
pStatus
->
clusterCfg
.
mnodeEqualVnodeNum
=
htonl
(
pDnode
->
opt
.
mnodeEqualVnodeNum
);
pStatus
->
clusterCfg
.
checkTime
=
0
;
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
pStatus
->
clusterCfg
.
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
pStatus
->
clusterCfg
.
checkTime
=
htonl
(
pStatus
->
clusterCfg
.
checkTime
);
tstrncpy
(
pStatus
->
clusterCfg
.
timezone
,
pDnode
->
opt
.
timezone
,
TSDB_TIMEZONE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
locale
,
pDnode
->
opt
.
locale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
pDnode
->
opt
.
charset
,
TSDB_LOCALE_LEN
);
taosRUnLockLatch
(
&
pMgmt
->
latch
);
dndGetVnodeLoads
(
pDnode
,
&
pStatus
->
vnodeLoads
);
...
...
source/dnode/mgmt/impl/src/dndMnode.c
浏览文件 @
40dbc1cd
...
...
@@ -332,6 +332,12 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption
->
putMsgToApplyMsgFp
=
dndPutMsgIntoMnodeApplyQueue
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
sver
=
pDnode
->
opt
.
sver
;
pOption
->
statusInterval
=
pDnode
->
opt
.
statusInterval
;
pOption
->
mnodeEqualVnodeNum
=
pDnode
->
opt
.
mnodeEqualVnodeNum
;
pOption
->
timezone
=
pDnode
->
opt
.
timezone
;
pOption
->
charset
=
pDnode
->
opt
.
charset
;
pOption
->
locale
=
pDnode
->
opt
.
locale
;
}
static
void
dndBuildMnodeDeployOption
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
...
...
source/dnode/mnode/impl/CMakeLists.txt
浏览文件 @
40dbc1cd
...
...
@@ -10,4 +10,5 @@ target_link_libraries(
PRIVATE sdb
PRIVATE transport
PRIVATE cjson
PRIVATE sync
)
\ No newline at end of file
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
40dbc1cd
...
...
@@ -24,6 +24,7 @@
#include "thash.h"
#include "cJSON.h"
#include "mnode.h"
#include "sync.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -78,6 +79,28 @@ typedef enum {
typedef
enum
{
TRN_POLICY_ROLLBACK
=
1
,
TRN_POLICY_RETRY
=
2
}
ETrnPolicy
;
typedef
enum
{
DND_STATUS_OFFLINE
=
0
,
DND_STATUS_READY
=
1
,
DND_STATUS_CREATING
=
2
,
DND_STATUS_DROPPING
=
3
}
EDndStatus
;
typedef
enum
{
DND_REASON_ONLINE
=
0
,
DND_REASON_STATUS_MSG_TIMEOUT
,
DND_REASON_STATUS_NOT_RECEIVED
,
DND_REASON_VERSION_NOT_MATCH
,
DND_REASON_DNODE_ID_NOT_MATCH
,
DND_REASON_CLUSTER_ID_NOT_MATCH
,
DND_REASON_MN_EQUAL_VN_NOT_MATCH
,
DND_REASON_STATUS_INTERVAL_NOT_MATCH
,
DND_REASON_TIME_ZONE_NOT_MATCH
,
DND_REASON_LOCALE_NOT_MATCH
,
DND_REASON_CHARSET_NOT_MATCH
,
DND_REASON_OTHERS
}
EDndReason
;
typedef
struct
STrans
{
int32_t
id
;
ETrnStage
stage
;
...
...
@@ -99,29 +122,32 @@ typedef struct SClusterObj {
}
SClusterObj
;
typedef
struct
SDnodeObj
{
int32_t
id
;
int32_t
vnodes
;
int64_t
createdTime
;
int64_t
updateTime
;
int64_t
lastAccess
;
int64_t
rebootTime
;
// time stamp for last reboot
char
fqdn
[
TSDB_FQDN_LEN
];
char
ep
[
TSDB_EP_LEN
];
uint16_t
port
;
int16_t
numOfCores
;
// from dnode status msg
int8_t
alternativeRole
;
// from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t
status
;
// set in balance function
int8_t
offlineReason
;
int32_t
id
;
int64_t
createdTime
;
int64_t
updateTime
;
int64_t
rebootTime
;
int32_t
accessTimes
;
int16_t
numOfMnodes
;
int16_t
numOfVnodes
;
int16_t
numOfQnodes
;
int16_t
numOfSupportMnodes
;
int16_t
numOfSupportVnodes
;
int16_t
numOfSupportQnodes
;
int16_t
numOfCores
;
EDndStatus
status
;
EDndReason
offlineReason
;
uint16_t
port
;
char
fqdn
[
TSDB_FQDN_LEN
];
char
ep
[
TSDB_EP_LEN
];
}
SDnodeObj
;
typedef
struct
SMnodeObj
{
int32_t
id
;
int8_t
status
;
int8_t
role
;
int32_t
roleTerm
;
int64_t
roleTime
;
int64_t
createdTime
;
int64_t
updateTime
;
ESyncState
role
;
int32_t
roleTerm
;
int64_t
roleTime
;
SDnodeObj
*
pDnode
;
}
SMnodeObj
;
...
...
source/dnode/mnode/impl/inc/mndDnode.h
浏览文件 @
40dbc1cd
...
...
@@ -22,8 +22,10 @@
extern
"C"
{
#endif
int32_t
mndInitDnode
(
SMnode
*
pMnode
);
void
mndCleanupDnode
(
SMnode
*
pMnode
);
int32_t
mndInitDnode
(
SMnode
*
pMnode
);
void
mndCleanupDnode
(
SMnode
*
pMnode
);
SDnodeObj
*
mndAcquireDnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
void
mndReleaseDnode
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
40dbc1cd
...
...
@@ -50,6 +50,12 @@ typedef struct SMnode {
SendMsgToMnodeFp
sendMsgToMnodeFp
;
SendRedirectMsgFp
sendRedirectMsgFp
;
PutMsgToMnodeQFp
putMsgToApplyMsgFp
;
int32_t
sver
;
int32_t
statusInterval
;
int32_t
mnodeEqualVnodeNum
;
char
*
timezone
;
char
*
locale
;
char
*
charset
;
}
SMnode
;
tmr_h
mndGetTimer
(
SMnode
*
pMnode
);
...
...
source/dnode/mnode/impl/inc/mndMnode.h
浏览文件 @
40dbc1cd
...
...
@@ -24,8 +24,7 @@ extern "C" {
int32_t
mndInitMnode
(
SMnode
*
pMnode
);
void
mndCleanupMnode
(
SMnode
*
pMnode
);
void
mndGetMnodeEpSetForPeer
(
SEpSet
*
epSet
,
bool
redirect
);
void
mndGetMnodeEpSetForShell
(
SEpSet
*
epSet
,
bool
redirect
);
bool
mndIsMnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
40dbc1cd
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_
TRANSACTION_INT
_H_
#define _TD_
TRANSACTION_INT
_H_
#ifndef _TD_
MND_TRANS
_H_
#define _TD_
MND_TRANS
_H_
#include "mndInt.h"
...
...
@@ -44,4 +44,4 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
}
#endif
#endif
/*_TD_
TRANSACTION_INT
_H_*/
#endif
/*_TD_
MND_TRANS
_H_*/
source/dnode/mnode/impl/src/mndAcct.c
浏览文件 @
40dbc1cd
...
...
@@ -79,7 +79,7 @@ static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *p
return
0
;
}
static
int32_t
mnodeCreateDefaultAcct
(
S
Sdb
*
pSdb
)
{
static
int32_t
mnodeCreateDefaultAcct
(
S
Mnode
*
pMnode
)
{
int32_t
code
=
0
;
SAcctObj
acctObj
=
{
0
};
...
...
@@ -98,7 +98,7 @@ static int32_t mnodeCreateDefaultAcct(SSdb *pSdb) {
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
return
sdbWrite
(
pSdb
,
pRaw
);
return
sdbWrite
(
p
Mnode
->
p
Sdb
,
pRaw
);
}
int32_t
mndInitAcct
(
SMnode
*
pMnode
)
{
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
40dbc1cd
...
...
@@ -14,8 +14,330 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mndInt.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndTrans.h"
#include "ttime.h"
int32_t
mndInitDnode
(
SMnode
*
pMnode
)
{
return
0
;
}
void
mndCleanupDnode
(
SMnode
*
pMnode
)
{}
\ No newline at end of file
#define SDB_DNODE_VER 1
static
char
*
offlineReason
[]
=
{
""
,
"status msg timeout"
,
"status not received"
,
"version not match"
,
"dnodeId not match"
,
"clusterId not match"
,
"mnEqualVn not match"
,
"interval not match"
,
"timezone not match"
,
"locale not match"
,
"charset not match"
,
"unknown"
,
};
static
SSdbRaw
*
mndDnodeActionEncode
(
SDnodeObj
*
pDnode
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_DNODE
,
SDB_DNODE_VER
,
sizeof
(
SDnodeObj
));
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
pDnode
->
id
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pDnode
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pDnode
->
updateTime
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pDnode
->
port
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
return
pRaw
;
}
static
SSdbRow
*
mndDnodeActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
SDB_DNODE_VER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to decode dnode since %s"
,
terrstr
());
return
NULL
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SDnodeObj
));
SDnodeObj
*
pDnode
=
sdbGetRowObj
(
pRow
);
if
(
pDnode
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pDnode
->
id
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pDnode
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pDnode
->
updateTime
)
SDB_GET_INT16
(
pRaw
,
pRow
,
dataPos
,
&
pDnode
->
port
)
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
)
return
pRow
;
}
static
void
mnodeResetDnode
(
SDnodeObj
*
pDnode
)
{
pDnode
->
rebootTime
=
0
;
pDnode
->
accessTimes
=
0
;
pDnode
->
numOfCores
=
0
;
pDnode
->
numOfMnodes
=
0
;
pDnode
->
numOfVnodes
=
0
;
pDnode
->
numOfQnodes
=
0
;
pDnode
->
numOfSupportMnodes
=
0
;
pDnode
->
numOfSupportVnodes
=
0
;
pDnode
->
numOfSupportQnodes
=
0
;
pDnode
->
status
=
DND_STATUS_OFFLINE
;
pDnode
->
offlineReason
=
DND_REASON_STATUS_NOT_RECEIVED
;
snprintf
(
pDnode
->
ep
,
TSDB_EP_LEN
,
"%s:%u"
,
pDnode
->
fqdn
,
pDnode
->
port
);
}
static
int32_t
mndDnodeActionInsert
(
SSdb
*
pSdb
,
SDnodeObj
*
pDnode
)
{
mnodeResetDnode
(
pDnode
);
return
0
;
}
static
int32_t
mndDnodeActionDelete
(
SSdb
*
pSdb
,
SDnodeObj
*
pDnode
)
{
return
0
;
}
static
int32_t
mndDnodeActionUpdate
(
SSdb
*
pSdb
,
SDnodeObj
*
pSrcDnode
,
SDnodeObj
*
pDstDnode
)
{
pSrcDnode
->
id
=
pDstDnode
->
id
;
pSrcDnode
->
createdTime
=
pDstDnode
->
createdTime
;
pSrcDnode
->
updateTime
=
pDstDnode
->
updateTime
;
pSrcDnode
->
port
=
pDstDnode
->
port
;
memcpy
(
pSrcDnode
->
fqdn
,
pDstDnode
->
fqdn
,
TSDB_FQDN_LEN
);
mnodeResetDnode
(
pSrcDnode
);
}
static
int32_t
mndCreateDefaultDnode
(
SMnode
*
pMnode
)
{
SDnodeObj
dnodeObj
=
{
0
};
dnodeObj
.
id
=
1
;
dnodeObj
.
createdTime
=
taosGetTimestampMs
();
dnodeObj
.
updateTime
=
dnodeObj
.
createdTime
;
dnodeObj
.
port
=
pMnode
->
replicas
[
0
].
port
;
memcpy
(
&
dnodeObj
.
fqdn
,
pMnode
->
replicas
[
0
].
fqdn
,
TSDB_FQDN_LEN
);
SSdbRaw
*
pRaw
=
mndDnodeActionEncode
(
&
dnodeObj
);
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
return
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
}
static
SDnodeObj
*
mndAcquireDnodeByEp
(
SMnode
*
pMnode
,
char
*
pEpStr
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SDnodeObj
*
pDnode
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_DNODE
,
pIter
,
(
void
**
)
&
pDnode
);
if
(
pIter
==
NULL
)
break
;
if
(
strncasecmp
(
pEpStr
,
pDnode
->
ep
,
TSDB_EP_LEN
)
==
0
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
return
pDnode
;
}
}
return
NULL
;
}
static
int32_t
mndGetDnodeSize
(
SMnode
*
pMnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbGetSize
(
pSdb
,
SDB_DNODE
);
}
static
void
mndGetDnodeData
(
SMnode
*
pMnode
,
SDnodeEps
*
pEps
,
int32_t
numOfEps
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
i
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SDnodeObj
*
pDnode
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_DNODE
,
pIter
,
(
void
**
)
&
pDnode
);
if
(
pIter
==
NULL
)
break
;
if
(
i
>=
numOfEps
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
break
;
}
SDnodeEp
*
pEp
=
&
pEps
->
eps
[
i
];
pEp
->
id
=
htonl
(
pDnode
->
id
);
pEp
->
port
=
htons
(
pDnode
->
port
);
memcpy
(
pEp
->
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
pEp
->
isMnode
=
0
;
if
(
mndIsMnode
(
pMnode
,
pDnode
->
id
))
{
pEp
->
isMnode
=
1
;
}
i
++
;
}
pEps
->
num
=
i
;
}
static
int32_t
mndCheckClusterCfgPara
(
SMnode
*
pMnode
,
const
SClusterCfg
*
pCfg
)
{
if
(
pCfg
->
mnodeEqualVnodeNum
!=
pMnode
->
mnodeEqualVnodeNum
)
{
mError
(
"
\"
mnodeEqualVnodeNum
\"
[%d - %d] cfg inconsistent"
,
pCfg
->
mnodeEqualVnodeNum
,
pMnode
->
mnodeEqualVnodeNum
);
return
DND_REASON_MN_EQUAL_VN_NOT_MATCH
;
}
if
(
pCfg
->
statusInterval
!=
pMnode
->
statusInterval
)
{
mError
(
"
\"
statusInterval
\"
[%d - %d] cfg inconsistent"
,
pCfg
->
statusInterval
,
pMnode
->
statusInterval
);
return
DND_REASON_STATUS_INTERVAL_NOT_MATCH
;
}
int64_t
checkTime
=
0
;
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
if
((
0
!=
strcasecmp
(
pCfg
->
timezone
,
pMnode
->
timezone
))
&&
(
checkTime
!=
pCfg
->
checkTime
))
{
mError
(
"
\"
timezone
\"
[%s - %s] [%"
PRId64
" - %"
PRId64
"] cfg inconsistent"
,
pCfg
->
timezone
,
tsTimezone
,
pCfg
->
checkTime
,
checkTime
);
return
DND_REASON_TIME_ZONE_NOT_MATCH
;
}
if
(
0
!=
strcasecmp
(
pCfg
->
locale
,
pMnode
->
locale
))
{
mError
(
"
\"
locale
\"
[%s - %s] cfg parameters inconsistent"
,
pCfg
->
locale
,
pMnode
->
locale
);
return
DND_REASON_LOCALE_NOT_MATCH
;
}
if
(
0
!=
strcasecmp
(
pCfg
->
charset
,
pMnode
->
charset
))
{
mError
(
"
\"
charset
\"
[%s - %s] cfg parameters inconsistent."
,
pCfg
->
charset
,
pMnode
->
charset
);
return
DND_REASON_CHARSET_NOT_MATCH
;
}
return
0
;
}
static
int32_t
mndProcessStatusMsg
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
)
{
SStatusMsg
*
pStatus
=
pMsg
->
rpcMsg
.
pCont
;
pStatus
->
sver
=
htonl
(
pStatus
->
sver
);
pStatus
->
dnodeId
=
htonl
(
pStatus
->
dnodeId
);
pStatus
->
clusterId
=
htobe64
(
pStatus
->
clusterId
);
pStatus
->
rebootTime
=
htonl
(
pStatus
->
rebootTime
);
pStatus
->
numOfCores
=
htons
(
pStatus
->
numOfCores
);
pStatus
->
numOfSupportMnodes
=
htons
(
pStatus
->
numOfSupportMnodes
);
pStatus
->
numOfSupportVnodes
=
htons
(
pStatus
->
numOfSupportVnodes
);
pStatus
->
numOfSupportQnodes
=
htons
(
pStatus
->
numOfSupportQnodes
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
pStatus
->
clusterCfg
.
statusInterval
);
pStatus
->
clusterCfg
.
mnodeEqualVnodeNum
=
htonl
(
pStatus
->
clusterCfg
.
mnodeEqualVnodeNum
);
pStatus
->
clusterCfg
.
checkTime
=
htobe64
(
pStatus
->
clusterCfg
.
checkTime
);
SDnodeObj
*
pDnode
=
NULL
;
if
(
pStatus
->
dnodeId
==
0
)
{
pDnode
=
mndAcquireDnodeByEp
(
pMnode
,
pStatus
->
dnodeEp
);
if
(
pDnode
==
NULL
)
{
mDebug
(
"dnode:%s, not created yet"
,
pStatus
->
dnodeEp
);
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
}
}
else
{
pDnode
=
mndAcquireDnode
(
pMnode
,
pStatus
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
pDnode
=
mndAcquireDnodeByEp
(
pMnode
,
pStatus
->
dnodeEp
);
if
(
pDnode
!=
NULL
&&
pDnode
->
status
!=
DND_STATUS_READY
)
{
pDnode
->
offlineReason
=
DND_REASON_DNODE_ID_NOT_MATCH
;
}
mError
(
"dnode:%d, %s not exist"
,
pStatus
->
dnodeId
,
pStatus
->
dnodeEp
);
mndReleaseDnode
(
pMnode
,
pDnode
);
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
}
}
if
(
pStatus
->
sver
!=
pMnode
->
sver
)
{
if
(
pDnode
!=
NULL
&&
pDnode
->
status
!=
DND_STATUS_READY
)
{
pDnode
->
offlineReason
=
DND_REASON_VERSION_NOT_MATCH
;
}
mndReleaseDnode
(
pMnode
,
pDnode
);
mError
(
"dnode:%d, status msg version:%d not match cluster:%d"
,
pStatus
->
dnodeId
,
pStatus
->
sver
,
pMnode
->
sver
);
return
TSDB_CODE_MND_INVALID_MSG_VERSION
;
}
int64_t
clusterId
=
mndGetClusterId
(
pMnode
);
if
(
pStatus
->
dnodeId
==
0
)
{
mDebug
(
"dnode:%d %s, first access, set clusterId %"
PRId64
,
pDnode
->
id
,
pDnode
->
ep
,
clusterId
);
}
else
{
if
(
pStatus
->
clusterId
!=
clusterId
)
{
if
(
pDnode
!=
NULL
&&
pDnode
->
status
!=
DND_STATUS_READY
)
{
pDnode
->
offlineReason
=
DND_REASON_CLUSTER_ID_NOT_MATCH
;
}
mError
(
"dnode:%d, clusterId %"
PRId64
" not match exist %"
PRId64
,
pDnode
->
id
,
pStatus
->
clusterId
,
clusterId
);
mndReleaseDnode
(
pMnode
,
pDnode
);
return
TSDB_CODE_MND_INVALID_CLUSTER_ID
;
}
else
{
pDnode
->
accessTimes
++
;
mTrace
(
"dnode:%d, status received, access times %d"
,
pDnode
->
id
,
pDnode
->
accessTimes
);
}
}
if
(
pDnode
->
status
==
DND_STATUS_OFFLINE
)
{
// Verify whether the cluster parameters are consistent when status change from offline to ready
int32_t
ret
=
mndCheckClusterCfgPara
(
pMnode
,
&
pStatus
->
clusterCfg
);
if
(
0
!=
ret
)
{
pDnode
->
offlineReason
=
ret
;
mError
(
"dnode:%d, cluster cfg inconsistent since:%s"
,
pDnode
->
id
,
offlineReason
[
ret
]);
mndReleaseDnode
(
pMnode
,
pDnode
);
return
TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT
;
}
mInfo
(
"dnode:%d, from offline to online"
,
pDnode
->
id
);
}
pDnode
->
rebootTime
=
pStatus
->
rebootTime
;
pDnode
->
numOfCores
=
pStatus
->
numOfCores
;
pDnode
->
numOfSupportMnodes
=
pStatus
->
numOfSupportMnodes
;
pDnode
->
numOfSupportVnodes
=
pStatus
->
numOfSupportVnodes
;
pDnode
->
numOfSupportQnodes
=
pStatus
->
numOfSupportQnodes
;
pDnode
->
status
=
DND_STATUS_READY
;
int32_t
numOfEps
=
mndGetDnodeSize
(
pMnode
);
int32_t
contLen
=
sizeof
(
SStatusRsp
)
+
numOfEps
*
sizeof
(
SDnodeEp
);
SStatusRsp
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
mndReleaseDnode
(
pMnode
,
pDnode
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pRsp
->
dnodeCfg
.
dnodeId
=
htonl
(
pDnode
->
id
);
pRsp
->
dnodeCfg
.
dropped
=
0
;
pRsp
->
dnodeCfg
.
clusterId
=
htobe64
(
clusterId
);
mndGetDnodeData
(
pMnode
,
&
pRsp
->
dnodeEps
,
numOfEps
);
pMsg
->
rpcRsp
.
len
=
contLen
;
pMsg
->
rpcRsp
.
rsp
=
pRsp
;
mndReleaseDnode
(
pMnode
,
pDnode
);
return
0
;
}
static
int32_t
mndProcessCreateDnodeMsg
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessDropDnodeMsg
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessConfigDnodeMsg
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
)
{
return
0
;
}
int32_t
mndInitDnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_DNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
deployFp
=
(
SdbDeployFp
)
mndCreateDefaultDnode
,
.
encodeFp
=
(
SdbEncodeFp
)
mndDnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndDnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndDnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndDnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndDnodeActionDelete
};
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_DNODE
,
mndProcessCreateDnodeMsg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_DNODE
,
mndProcessDropDnodeMsg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CONFIG_DNODE
,
mndProcessConfigDnodeMsg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_STATUS_RSP
,
mndProcessStatusMsg
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupDnode
(
SMnode
*
pMnode
)
{}
SDnodeObj
*
mndAcquireDnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
dnodeId
);
}
void
mndReleaseDnode
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pDnode
);
}
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
40dbc1cd
...
...
@@ -14,11 +14,120 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mndInt.h"
#include "mndTrans.h"
int32_t
mndInitMnode
(
SMnode
*
pMnode
)
{
return
0
;
}
void
mndCleanupMnode
(
SMnode
*
pMnode
)
{}
#define SDB_MNODE_VER 1
void
mndGetMnodeEpSetForPeer
(
SEpSet
*
epSet
,
bool
redirect
)
{}
void
mndGetMnodeEpSetForShell
(
SEpSet
*
epSet
,
bool
redirect
)
{}
\ No newline at end of file
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
pMnodeObj
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_MNODE
,
SDB_MNODE_VER
,
sizeof
(
SMnodeObj
));
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
pMnodeObj
->
id
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pMnodeObj
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pMnodeObj
->
updateTime
)
return
pRaw
;
}
static
SSdbRow
*
mndMnodeActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
SDB_MNODE_VER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to decode mnode since %s"
,
terrstr
());
return
NULL
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SMnodeObj
));
SMnodeObj
*
pMnodeObj
=
sdbGetRowObj
(
pRow
);
if
(
pMnodeObj
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pMnodeObj
->
id
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pMnodeObj
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pMnodeObj
->
updateTime
)
return
pRow
;
}
static
void
mnodeResetMnode
(
SMnodeObj
*
pMnodeObj
)
{
pMnodeObj
->
role
=
TAOS_SYNC_STATE_FOLLOWER
;
pMnodeObj
->
roleTerm
=
0
;
pMnodeObj
->
roleTime
=
0
;
}
static
int32_t
mndMnodeActionInsert
(
SSdb
*
pSdb
,
SMnodeObj
*
pMnodeObj
)
{
pMnodeObj
->
pDnode
=
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
pMnodeObj
->
id
);
if
(
pMnodeObj
->
pDnode
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
}
mnodeResetMnode
(
pMnodeObj
);
return
0
;
}
static
int32_t
mndMnodeActionDelete
(
SSdb
*
pSdb
,
SMnodeObj
*
pMnodeObj
)
{
if
(
pMnodeObj
->
pDnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
pMnodeObj
->
pDnode
);
pMnodeObj
->
pDnode
=
NULL
;
}
return
0
;
}
static
int32_t
mndMnodeActionUpdate
(
SSdb
*
pSdb
,
SMnodeObj
*
pSrcMnode
,
SMnodeObj
*
pDstMnode
)
{
pSrcMnode
->
id
=
pDstMnode
->
id
;
pSrcMnode
->
createdTime
=
pDstMnode
->
createdTime
;
pSrcMnode
->
updateTime
=
pDstMnode
->
updateTime
;
mnodeResetMnode
(
pSrcMnode
);
}
static
int32_t
mndCreateDefaultMnode
(
SMnode
*
pMnode
)
{
SMnodeObj
mnodeObj
=
{
0
};
mnodeObj
.
id
=
0
;
mnodeObj
.
createdTime
=
taosGetTimestampMs
();
mnodeObj
.
updateTime
=
mnodeObj
.
createdTime
;
SSdbRaw
*
pRaw
=
mndMnodeActionEncode
(
&
mnodeObj
);
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
return
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
}
static
int32_t
mndProcessCreateMnodeMsg
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessDropMnodeMsg
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
)
{
return
0
;
}
int32_t
mndInitMnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_MNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
deployFp
=
(
SdbDeployFp
)
mndCreateDefaultMnode
,
.
encodeFp
=
(
SdbEncodeFp
)
mndMnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndMnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndMnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndMnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndMnodeActionDelete
};
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_MNODE
,
mndProcessCreateMnodeMsg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_MNODE
,
mndProcessDropMnodeMsg
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupMnode
(
SMnode
*
pMnode
)
{}
bool
mndIsMnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SMnodeObj
*
pMnodeObj
=
sdbAcquire
(
pSdb
,
SDB_MNODE
,
&
dnodeId
);
if
(
pMnodeObj
==
NULL
)
{
return
false
;
}
sdbRelease
(
pSdb
,
pMnodeObj
);
return
true
;
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
40dbc1cd
...
...
@@ -21,7 +21,7 @@
#define SDB_USER_VER 1
static
SSdbRaw
*
mndUserActionEncode
(
SUserObj
*
pUser
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_USER
,
SDB_USER_VER
,
sizeof
(
S
Acct
Obj
));
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_USER
,
SDB_USER_VER
,
sizeof
(
S
User
Obj
));
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
...
...
@@ -97,7 +97,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDs
return
0
;
}
static
int32_t
mndCreateDefaultUser
(
S
Sdb
*
pSdb
,
char
*
acct
,
char
*
user
,
char
*
pass
)
{
static
int32_t
mndCreateDefaultUser
(
S
Mnode
*
pMnode
,
char
*
acct
,
char
*
user
,
char
*
pass
)
{
SUserObj
userObj
=
{
0
};
tstrncpy
(
userObj
.
user
,
user
,
TSDB_USER_LEN
);
tstrncpy
(
userObj
.
acct
,
acct
,
TSDB_USER_LEN
);
...
...
@@ -113,15 +113,15 @@ static int32_t mndCreateDefaultUser(SSdb *pSdb, char *acct, char *user, char *pa
if
(
pRaw
==
NULL
)
return
-
1
;
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
return
sdbWrite
(
pSdb
,
pRaw
);
return
sdbWrite
(
p
Mnode
->
p
Sdb
,
pRaw
);
}
static
int32_t
mndCreateDefaultUsers
(
S
Sdb
*
pSdb
)
{
if
(
mndCreateDefaultUser
(
p
Sdb
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
)
!=
0
)
{
static
int32_t
mndCreateDefaultUsers
(
S
Mnode
*
pMnode
)
{
if
(
mndCreateDefaultUser
(
p
Mnode
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
)
!=
0
)
{
return
-
1
;
}
if
(
mndCreateDefaultUser
(
p
Sdb
,
TSDB_DEFAULT_USER
,
"_"
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
)
!=
0
)
{
if
(
mndCreateDefaultUser
(
p
Mnode
,
TSDB_DEFAULT_USER
,
"_"
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
)
!=
0
)
{
return
-
1
;
}
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
40dbc1cd
...
...
@@ -225,15 +225,22 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode
->
sendMsgToDnodeFp
=
pOption
->
sendMsgToDnodeFp
;
pMnode
->
sendMsgToMnodeFp
=
pOption
->
sendMsgToMnodeFp
;
pMnode
->
sendRedirectMsgFp
=
pOption
->
sendRedirectMsgFp
;
pMnode
->
sver
=
pOption
->
sver
;
pMnode
->
statusInterval
=
pOption
->
statusInterval
;
pMnode
->
mnodeEqualVnodeNum
=
pOption
->
mnodeEqualVnodeNum
;
pMnode
->
timezone
=
strdup
(
pOption
->
timezone
);
pMnode
->
locale
=
strdup
(
pOption
->
locale
);
pMnode
->
charset
=
strdup
(
pOption
->
charset
);
if
(
pMnode
->
sendMsgToDnodeFp
==
NULL
||
pMnode
->
sendMsgToMnodeFp
==
NULL
||
pMnode
->
sendRedirectMsgFp
==
NULL
||
pMnode
->
putMsgToApplyMsgFp
==
NULL
)
{
pMnode
->
putMsgToApplyMsgFp
==
NULL
||
pMnode
->
dnodeId
<
0
||
pMnode
->
clusterId
<
0
||
pMnode
->
statusInterval
<
1
||
pOption
->
mnodeEqualVnodeNum
<
0
)
{
terrno
=
TSDB_CODE_MND_APP_ERROR
;
return
-
1
;
}
if
(
pMnode
->
dnodeId
<
0
||
pMnode
->
clusterId
<
0
)
{
terrno
=
TSDB_CODE_
MND_APP_ERROR
;
if
(
pMnode
->
timezone
==
NULL
||
pMnode
->
locale
==
NULL
||
pMnode
->
charset
==
NULL
)
{
terrno
=
TSDB_CODE_
OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -299,6 +306,9 @@ void mndClose(SMnode *pMnode) {
mDebug
(
"start to close mnode"
);
mndCleanupSteps
(
pMnode
,
-
1
);
tfree
(
pMnode
->
path
);
tfree
(
pMnode
->
charset
);
tfree
(
pMnode
->
locale
);
tfree
(
pMnode
->
timezone
);
tfree
(
pMnode
);
mDebug
(
"mnode is closed"
);
}
...
...
source/dnode/mnode/sdb/inc/sdbInt.h
浏览文件 @
40dbc1cd
...
...
@@ -38,8 +38,8 @@ extern "C" {
typedef
struct
SSdbRaw
{
int8_t
type
;
int8_t
sver
;
int8_t
status
;
int8_t
sver
;
int8_t
reserved
;
int32_t
dataLen
;
char
pData
[];
...
...
@@ -53,6 +53,7 @@ typedef struct SSdbRow {
}
SSdbRow
;
typedef
struct
SSdb
{
SMnode
*
pMnode
;
char
*
currDir
;
char
*
syncDir
;
char
*
tmpDir
;
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
40dbc1cd
...
...
@@ -27,7 +27,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
}
char
path
[
PATH_MAX
+
100
];
snprintf
(
path
,
PATH_MAX
+
100
,
"%s
"
,
pOption
->
path
);
snprintf
(
path
,
PATH_MAX
+
100
,
"%s
%sdata"
,
pOption
->
path
,
TD_DIRSEP
);
pSdb
->
currDir
=
strdup
(
path
);
snprintf
(
path
,
PATH_MAX
+
100
,
"%s%ssync"
,
pOption
->
path
,
TD_DIRSEP
);
pSdb
->
syncDir
=
strdup
(
path
);
...
...
@@ -44,6 +44,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
taosInitRWLatch
(
&
pSdb
->
locks
[
i
]);
}
pSdb
->
pMnode
=
pOption
->
pMnode
;
mDebug
(
"sdb init successfully"
);
return
pSdb
;
}
...
...
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
40dbc1cd
...
...
@@ -46,7 +46,7 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
SdbDeployFp
fp
=
pSdb
->
deployFps
[
i
];
if
(
fp
==
NULL
)
continue
;
if
((
*
fp
)(
pSdb
)
!=
0
)
{
if
((
*
fp
)(
pSdb
->
pMnode
)
!=
0
)
{
mError
(
"failed to deploy sdb:%d since %s"
,
i
,
terrstr
());
return
-
1
;
}
...
...
source/dnode/mnode/sdb/src/sdbRaw.c
浏览文件 @
40dbc1cd
...
...
@@ -61,6 +61,21 @@ int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) {
return
0
;
}
int32_t
sdbSetRawInt16
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int16_t
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
sizeof
(
int16_t
)
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
*
(
int16_t
*
)(
pRaw
->
pData
+
dataPos
)
=
val
;
return
0
;
}
int32_t
sdbSetRawInt64
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int64_t
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
...
...
@@ -146,6 +161,21 @@ int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) {
return
0
;
}
int32_t
sdbGetRawInt16
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int16_t
*
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
if
(
dataPos
+
sizeof
(
int16_t
)
>
pRaw
->
dataLen
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_LEN
;
return
-
1
;
}
*
val
=
*
(
int16_t
*
)(
pRaw
->
pData
+
dataPos
);
return
0
;
}
int32_t
sdbGetRawInt64
(
SSdbRaw
*
pRaw
,
int32_t
dataPos
,
int64_t
*
val
)
{
if
(
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录