Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
99c71628
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
99c71628
编写于
11月 02, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact dnode-mnode
上级
84effaab
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
297 addition
and
201 deletion
+297
-201
include/common/taosmsg.h
include/common/taosmsg.h
+2
-4
include/server/mnode/mnode.h
include/server/mnode/mnode.h
+16
-95
include/server/vnode/vnode.h
include/server/vnode/vnode.h
+2
-38
include/util/taoserror.h
include/util/taoserror.h
+7
-5
source/dnode/mgmt/src/dnodeDnode.c
source/dnode/mgmt/src/dnodeDnode.c
+28
-29
source/dnode/mgmt/src/dnodeMnode.c
source/dnode/mgmt/src/dnodeMnode.c
+234
-29
source/dnode/mnode/inc/mnodeInt.h
source/dnode/mnode/inc/mnodeInt.h
+2
-0
source/dnode/mnode/src/mondeInt.c
source/dnode/mnode/src/mondeInt.c
+3
-0
source/dnode/vnode/impl/src/vnodeInt.c
source/dnode/vnode/impl/src/vnodeInt.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+2
-0
未找到文件。
include/common/taosmsg.h
浏览文件 @
99c71628
...
...
@@ -832,10 +832,8 @@ typedef struct {
}
SCreateDnodeMsg
,
SDropDnodeMsg
;
typedef
struct
{
int32_t
dnodeId
;
int32_t
mnodeNum
;
SDnodeEp
mnodeEps
[];
}
SCreateMnodeMsg
;
int32_t
dnodeId
;
}
SCreateMnodeMsg
,
SDropMnodeMsg
;
typedef
struct
{
int32_t
dnodeId
;
...
...
include/server/mnode/mnode.h
浏览文件 @
99c71628
...
...
@@ -20,41 +20,23 @@
extern
"C"
{
#endif
typedef
enum
{
MN_STATUS_UNINIT
=
0
,
MN_STATUS_INIT
=
1
,
MN_STATUS_READY
=
2
,
MN_STATUS_CLOSING
=
3
}
EMnStatus
;
typedef
struct
{
int64_t
numOfDnode
;
int64_t
numOfMnode
;
int64_t
numOfVgroup
;
int64_t
numOfDatabase
;
int64_t
numOfSuperTable
;
int64_t
numOfChildTable
;
int64_t
numOfColumn
;
int64_t
totalPoints
;
int64_t
totalStorage
;
int64_t
compStorage
;
}
SMnodeStat
;
typedef
struct
{
/**
* Send messages to other dnodes, such as create vnode message.
*
* @param epSet, the endpoint list of the dnodes.
* @param rpcMsg, message to be sent.
*/
void
(
*
SendMsgToDnode
)(
struct
SEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
/**
* Send messages to mnode, such as config message.
*
* @param rpcMsg, message to be sent.
*/
void
(
*
SendMsgToMnode
)(
struct
SRpcMsg
*
rpcMsg
);
/**
* Send redirect message to dnode or shell.
*
* @param rpcMsg, message to be sent.
* @param forShell, used to identify whether to send to shell or dnode.
*/
void
(
*
SendRedirectMsg
)(
struct
SRpcMsg
*
rpcMsg
,
bool
forShell
);
/**
* Get the corresponding endpoint information from dnodeId.
*
* @param dnode, the instance of dDnode module.
* @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
void
(
*
GetDnodeEp
)(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
}
SMnodeFp
;
...
...
@@ -64,77 +46,16 @@ typedef struct {
int32_t
dnodeId
;
}
SMnodePara
;
/**
* Initialize and start mnode module.
*
* @param para, initialization parameters.
* @return Error code.
*/
int32_t
mnodeInit
(
SMnodePara
para
);
/**
* Stop and cleanup mnode module.
*/
void
mnodeCleanup
();
/**
* Deploy mnode instances in dnode.
*
* @return Error Code.
*/
void
mnodeCleanup
();
int32_t
mnodeDeploy
();
void
mnodeUnDeploy
();
int32_t
mnodeStart
();
void
mnodeStop
();
/**
* Delete the mnode instance deployed in dnode.
*/
void
mnodeUnDeploy
();
/**
* Whether the mnode is in service.
*
* @return Server status.
*/
EMnStatus
mnodeGetStatus
();
typedef
struct
{
int64_t
numOfDnode
;
int64_t
numOfMnode
;
int64_t
numOfVgroup
;
int64_t
numOfDatabase
;
int64_t
numOfSuperTable
;
int64_t
numOfChildTable
;
int64_t
numOfColumn
;
int64_t
totalPoints
;
int64_t
totalStorage
;
int64_t
compStorage
;
}
SMnodeStat
;
/**
* Get the statistical information of Mnode.
*
* @param stat, statistical information.
* @return Error Code.
*/
int32_t
mnodeGetStatistics
(
SMnodeStat
*
stat
);
/**
* Get the auth information of Mnode.
*
* @param user, username.
* @param spi, security parameter index.
* @param encrypt, encrypt algorithm.
* @param secret, key for authentication.
* @param ckey, ciphering key.
* @return Error Code.
*/
int32_t
mnodeRetriveAuth
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
/**
* Interface for processing messages.
*
* @param rpcMsg, message to be processed.
* @return Error code.
*/
void
mnodeProcessMsg
(
SRpcMsg
*
rpcMsg
);
#ifdef __cplusplus
...
...
include/server/vnode/vnode.h
浏览文件 @
99c71628
...
...
@@ -66,41 +66,12 @@ typedef struct SVnodeMsg {
char
pCont
[];
}
SVnodeMsg
;
/**
* Start initialize vnode module.
*
* @return Error code.
*/
int32_t
vnodeInit
();
void
vnodeCleanup
();
/**
* Cleanup vnode module.
*/
void
vnodeCleanup
();
/**
* Get the statistical information of vnode.
*
* @param pVnode,
* @param pStat, statistical information.
* @return Error Code.
*/
int32_t
vnodeGetStatistics
(
SVnode
*
pVnode
,
SVnodeStatisic
*
pStat
);
/**
* Get the status of all vnodes.
*
* @param pVnode,
* @param status, status information.
* @return Error Code.
*/
int32_t
vnodeGetStatus
(
SVnode
*
pVnode
,
SVnodeStatus
*
pStatus
);
/**
* Operation functions of vnode
*
* @return Error Code.
*/
SVnode
*
vnodeOpen
(
int32_t
vgId
,
const
char
*
path
);
void
vnodeClose
(
SVnode
*
pVnode
);
int32_t
vnodeAlter
(
SVnode
*
pVnode
,
const
SVnodeCfg
*
pCfg
);
...
...
@@ -109,14 +80,7 @@ int32_t vnodeDrop(SVnode *pVnode);
int32_t
vnodeCompact
(
SVnode
*
pVnode
);
int32_t
vnodeSync
(
SVnode
*
pVnode
);
/**
* Interface for processing messages.
*
* @param pVnode,
* @param pMsg, message to be processed.
*
*/
int32_t
vnodeProcessMsg
(
SVnode
*
pVnode
,
SVnodeMsg
*
pMsg
);
void
vnodeProcessMsg
(
SVnode
*
pVnode
,
SVnodeMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
include/util/taoserror.h
浏览文件 @
99c71628
...
...
@@ -216,11 +216,13 @@ int32_t* taosGetErrno();
// dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed")
#define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory")
#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0402) //"No permission for disk files in dnode")
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0403) //"Invalid message length")
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0404) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0405) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0406) //"Dnode is exiting"
#define TSDB_CODE_DND_DNODE_ID_NOT_MATCHED TAOS_DEF_ERROR_CODE(0, 0x0402) //"Dnode Id not matched")
#define TSDB_CODE_DND_MNODE_ALREADY_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0403) //"Mnode already deployed")
#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0404) //"No permission for disk files in dnode")
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0405) //"Invalid message length")
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting"
// vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
...
...
source/dnode/mgmt/src/dnodeDnode.c
浏览文件 @
99c71628
...
...
@@ -35,7 +35,6 @@ static struct {
int8_t
threadStop
;
pthread_t
*
threadId
;
pthread_mutex_t
mutex
;
MsgFp
msgFp
[
TSDB_MSG_TYPE_MAX
];
}
tsDnode
=
{
0
};
int32_t
dnodeGetDnodeId
()
{
...
...
@@ -127,7 +126,7 @@ static void dnodeUpdateMnodeEpSet(SEpSet *pEpSet) {
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
}
static
void
dnodePrint
Ep
s
()
{
static
void
dnodePrint
Dnode
s
()
{
dDebug
(
"print dnode endpoint list, num:%d"
,
tsDnode
.
dnodeEps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
tsDnode
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
tsDnode
.
dnodeEps
->
dnodeEps
[
i
];
...
...
@@ -135,7 +134,7 @@ static void dnodePrintEps() {
}
}
static
void
dnodeReset
Ep
s
(
SDnodeEps
*
pEps
)
{
static
void
dnodeReset
Dnode
s
(
SDnodeEps
*
pEps
)
{
assert
(
pEps
!=
NULL
);
int32_t
size
=
sizeof
(
SDnodeEps
)
+
pEps
->
dnodeNum
*
sizeof
(
SDnodeEp
);
...
...
@@ -171,7 +170,7 @@ static void dnodeResetEps(SDnodeEps *pEps) {
taosHashPut
(
tsDnode
.
dnodeHash
,
&
ep
->
dnodeId
,
sizeof
(
int32_t
),
ep
,
sizeof
(
SDnodeEp
));
}
dnodePrint
Ep
s
();
dnodePrint
Dnode
s
();
}
static
bool
dnodeIsEpChanged
(
int32_t
dnodeId
,
char
*
epStr
)
{
...
...
@@ -189,7 +188,7 @@ static bool dnodeIsEpChanged(int32_t dnodeId, char *epStr) {
return
changed
;
}
static
int32_t
dnodeRead
Ep
s
()
{
static
int32_t
dnodeRead
Dnode
s
()
{
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
...
...
@@ -199,59 +198,59 @@ static int32_t dnodeReadEps() {
fp
=
fopen
(
tsDnode
.
file
,
"r"
);
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
root
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dnodeId not found"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
tsDnode
.
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterId"
);
if
(
!
clusterId
||
clusterId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since clusterId not found"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
tsDnode
.
clusterId
=
atoll
(
clusterId
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dropped not found"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
tsDnode
.
dropped
=
atoi
(
dropped
->
valuestring
);
cJSON
*
dnodeInfos
=
cJSON_GetObjectItem
(
root
,
"dnodeInfos"
);
if
(
!
dnodeInfos
||
dnodeInfos
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since dnodeInfos not found"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
int32_t
dnodeInfosSize
=
cJSON_GetArraySize
(
dnodeInfos
);
if
(
dnodeInfosSize
<=
0
)
{
dError
(
"failed to read %s since dnodeInfos size:%d invalid"
,
tsDnode
.
file
,
dnodeInfosSize
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
tsDnode
.
dnodeEps
=
calloc
(
1
,
dnodeInfosSize
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
));
if
(
tsDnode
.
dnodeEps
==
NULL
)
{
dError
(
"failed to calloc dnodeEpList since %s"
,
strerror
(
errno
));
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
tsDnode
.
dnodeEps
->
dnodeNum
=
dnodeInfosSize
;
...
...
@@ -264,36 +263,36 @@ static int32_t dnodeReadEps() {
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodeId not found"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
pEp
->
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
isMnode
=
cJSON_GetObjectItem
(
dnodeInfo
,
"isMnode"
);
if
(
!
isMnode
||
isMnode
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, isMnode not found"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
pEp
->
isMnode
=
atoi
(
isMnode
->
valuestring
);
cJSON
*
dnodeFqdn
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeFqdn"
);
if
(
!
dnodeFqdn
||
dnodeFqdn
->
type
!=
cJSON_String
||
dnodeFqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s, dnodeFqdn not found"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
tstrncpy
(
pEp
->
dnodeFqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
dnodePort
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodePort"
);
if
(
!
dnodePort
||
dnodePort
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodePort not found"
,
tsDnode
.
file
);
goto
PRASE_
EPS
_OVER
;
goto
PRASE_
DNODE
_OVER
;
}
pEp
->
dnodePort
=
atoi
(
dnodePort
->
valuestring
);
}
dInfo
(
"succcessed to read file %s"
,
tsDnode
.
file
);
dnodePrint
Ep
s
();
dnodePrint
Dnode
s
();
PRASE_
EPS
_OVER:
PRASE_
DNODE
_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
...
...
@@ -303,13 +302,13 @@ PRASE_EPS_OVER:
return
-
1
;
}
dnodeReset
Ep
s
(
tsDnode
.
dnodeEps
);
dnodeReset
Dnode
s
(
tsDnode
.
dnodeEps
);
terrno
=
0
;
return
0
;
}
static
int32_t
dnodeWrite
Ep
s
()
{
static
int32_t
dnodeWrite
Dnode
s
()
{
FILE
*
fp
=
fopen
(
tsDnode
.
file
,
"w"
);
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
tsDnode
.
file
,
strerror
(
errno
));
...
...
@@ -391,7 +390,7 @@ static void dnodeUpdateCfg(SDnodeCfg *pCfg) {
tsDnode
.
dropped
=
pCfg
->
dropped
;
dInfo
(
"dnodeId is set to %d, clusterId is set to %"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
dnodeWrite
Ep
s
();
dnodeWrite
Dnode
s
();
pthread_mutex_unlock
(
&
tsDnode
.
mutex
);
}
...
...
@@ -401,13 +400,13 @@ static void dnodeUpdateDnodeEps(SDnodeEps *pEps) {
pthread_mutex_lock
(
&
tsDnode
.
mutex
);
if
(
pEps
->
dnodeNum
!=
tsDnode
.
dnodeEps
->
dnodeNum
)
{
dnodeReset
Ep
s
(
pEps
);
dnodeWrite
Ep
s
();
dnodeReset
Dnode
s
(
pEps
);
dnodeWrite
Dnode
s
();
}
else
{
int32_t
size
=
pEps
->
dnodeNum
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
);
if
(
memcmp
(
tsDnode
.
dnodeEps
,
pEps
,
size
)
!=
0
)
{
dnodeReset
Ep
s
(
pEps
);
dnodeWrite
Ep
s
();
dnodeReset
Dnode
s
(
pEps
);
dnodeWrite
Dnode
s
();
}
}
...
...
@@ -493,9 +492,9 @@ int32_t dnodeInitDnode() {
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
int32_t
code
=
dnodeRead
Ep
s
();
int32_t
code
=
dnodeRead
Dnode
s
();
if
(
code
!=
0
)
{
dError
(
"failed to read
dnode endpoint file since %s"
,
tstrerror
(
code
));
dError
(
"failed to read
file:%s since %s"
,
tsDnode
.
file
,
tstrerror
(
code
));
return
code
;
}
...
...
source/dnode/mgmt/src/dnodeMnode.c
浏览文件 @
99c71628
...
...
@@ -17,59 +17,264 @@
#include "dnodeMnode.h"
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "cJSON.h"
#include "mnode.h"
int32_t
dnodeInitMnode
()
{
SMnodePara
para
;
para
.
fp
.
GetDnodeEp
=
dnodeGetDnodeEp
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendRedirectMsg
=
dnodeSendRedirectMsg
;
para
.
dnodeId
=
dnodeGetDnodeId
();
para
.
clusterId
=
dnodeGetClusterId
();
static
struct
{
int8_t
deployed
;
int8_t
dropped
;
char
file
[
PATH_MAX
+
20
];
pthread_mutex_t
mutex
;
}
tsMnode
=
{
0
};
return
mnodeInit
(
para
);
}
static
int32_t
dnodeReadMnode
()
{
int32_t
len
=
0
;
int32_t
maxLen
=
300
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
void
dnodeCleanupMnode
()
{
mnodeCleanup
();
}
fp
=
fopen
(
tsMnode
.
file
,
"r"
);
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
tsMnode
.
file
);
goto
PRASE_MNODE_OVER
;
}
static
int32_t
dnodeStartMnode
(
SRpcMsg
*
pMsg
)
{
SCreateMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
mnodeNum
=
htonl
(
pCfg
->
mnodeNum
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
mnodeNum
;
++
i
)
{
pCfg
->
mnodeEps
[
i
].
dnodeId
=
htonl
(
pCfg
->
mnodeEps
[
i
].
dnodeId
);
pCfg
->
mnodeEps
[
i
].
dnodePort
=
htons
(
pCfg
->
mnodeEps
[
i
].
dnodePort
);
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
tsMnode
.
file
);
goto
PRASE_MNODE_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
tsMnode
.
file
);
goto
PRASE_MNODE_OVER
;
}
cJSON
*
deployed
=
cJSON_GetObjectItem
(
root
,
"deployed"
);
if
(
!
deployed
||
deployed
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since deployed not found"
,
tsMnode
.
file
);
goto
PRASE_MNODE_OVER
;
}
tsMnode
.
deployed
=
atoi
(
deployed
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dropped not found"
,
tsMnode
.
file
);
goto
PRASE_MNODE_OVER
;
}
tsMnode
.
dropped
=
atoi
(
dropped
->
valuestring
);
dInfo
(
"succcessed to read file %s"
,
tsMnode
.
file
);
PRASE_MNODE_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
return
0
;
}
static
int32_t
dnodeWriteMnode
()
{
FILE
*
fp
=
fopen
(
tsMnode
.
file
,
"w"
);
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
tsMnode
.
file
,
strerror
(
errno
));
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
300
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
:
\"
%d
\"
,
\n
"
,
tsMnode
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
:
\"
%d
\"
,
\n
"
,
tsMnode
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
dInfo
(
"successed to write %s"
,
tsMnode
.
file
);
return
0
;
}
static
int32_t
dnodeStartMnode
(
SCreateMnodeMsg
*
pCfg
)
{
int32_t
code
=
0
;
if
(
pCfg
->
dnodeId
!=
dnodeGetDnodeId
())
{
dDebug
(
"dnode:%d, in create meps msg is not equal with saved dnodeId:%d"
,
pCfg
->
dnodeId
,
dnodeGetDnodeId
());
return
TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED
;
code
=
TSDB_CODE_DND_DNODE_ID_NOT_MATCHED
;
dError
(
"failed to start mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
if
(
mnodeGetStatus
()
==
MN_STATUS_READY
)
return
0
;
if
(
tsMnode
.
dropped
)
{
code
=
TSDB_CODE_DND_MNODE_ALREADY_DROPPED
;
dError
(
"failed to start mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
if
(
tsMnode
.
deployed
)
{
dError
(
"failed to start mnode since its already deployed"
);
return
0
;
}
return
mnodeDeploy
();
tsMnode
.
deployed
=
1
;
tsMnode
.
dropped
=
0
;
code
=
dnodeWriteMnode
();
if
(
code
!=
0
)
{
tsMnode
.
deployed
=
0
;
dError
(
"failed to start mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
code
=
mnodeDeploy
();
if
(
code
!=
0
)
{
tsMnode
.
deployed
=
0
;
dError
(
"failed to start mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
code
=
mnodeStart
();
if
(
code
!=
0
)
{
tsMnode
.
deployed
=
0
;
dError
(
"failed to start mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
tsMnode
.
deployed
=
1
;
return
0
;
}
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
dnodeStartMnode
(
pMsg
);
static
int32_t
dnodeDropMnode
(
SDropMnodeMsg
*
pCfg
)
{
int32_t
code
=
0
;
if
(
pCfg
->
dnodeId
!=
dnodeGetDnodeId
())
{
code
=
TSDB_CODE_DND_DNODE_ID_NOT_MATCHED
;
dError
(
"failed to drop mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
if
(
tsMnode
.
dropped
)
{
code
=
TSDB_CODE_DND_MNODE_ALREADY_DROPPED
;
dError
(
"failed to drop mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
if
(
!
tsMnode
.
deployed
)
{
dError
(
"failed to drop mnode since not deployed"
);
return
0
;
}
mnodeStop
();
tsMnode
.
deployed
=
0
;
tsMnode
.
dropped
=
1
;
code
=
dnodeWriteMnode
();
if
(
code
!=
0
)
{
tsMnode
.
deployed
=
1
;
tsMnode
.
dropped
=
0
;
dError
(
"failed to drop mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
mnodeUnDeploy
();
tsMnode
.
deployed
=
0
;
return
0
;
}
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
static
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
)
{
SCreateMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
int32_t
code
=
dnodeStartMnode
(
pCfg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
void
dnodeProcessDropMnodeReq
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
dnodeStartMnode
(
pMsg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
static
void
dnodeProcessDropMnodeReq
(
SRpcMsg
*
pMsg
)
{
SDropMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
int32_t
code
=
dnodeDropMnode
(
pCfg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
static
bool
dnodeNeedDeployMnode
()
{
if
(
dnodeGetDnodeId
()
>
0
)
return
false
;
if
(
dnodeGetClusterId
()
>
0
)
return
false
;
if
(
strcmp
(
tsFirst
,
tsLocalEp
)
!=
0
)
return
false
;
return
true
;
}
int32_t
dnodeInitMnode
()
{
tsMnode
.
dropped
=
0
;
tsMnode
.
deployed
=
0
;
snprintf
(
tsMnode
.
file
,
sizeof
(
tsMnode
.
file
),
"%s/mnode.json"
,
tsDnodeDir
);
SMnodePara
para
;
para
.
fp
.
GetDnodeEp
=
dnodeGetDnodeEp
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendRedirectMsg
=
dnodeSendRedirectMsg
;
para
.
dnodeId
=
dnodeGetDnodeId
();
para
.
clusterId
=
dnodeGetClusterId
();
int32_t
code
=
mnodeInit
(
para
);
if
(
code
!=
0
)
{
dError
(
"failed to init mnode module since %s"
,
tstrerror
(
code
));
return
code
;
}
code
=
dnodeReadMnode
();
if
(
code
!=
0
)
{
dError
(
"failed to read file:%s since %s"
,
tsMnode
.
file
,
tstrerror
(
code
));
return
code
;
}
if
(
tsMnode
.
dropped
)
{
dError
(
"mnode already dropped, undeploy it"
);
mnodeUnDeploy
();
return
0
;
}
if
(
!
tsMnode
.
deployed
)
{
bool
needDeploy
=
dnodeNeedDeployMnode
();
if
(
needDeploy
)
{
code
=
mnodeDeploy
();
}
else
{
return
0
;
}
if
(
code
!=
0
)
{
dError
(
"failed to deploy mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
tsMnode
.
deployed
=
1
;
}
return
mnodeStart
();
}
void
dnodeCleanupMnode
()
{
if
(
tsMnode
.
deployed
)
{
mnodeStop
();
}
mnodeCleanup
();
}
void
dnodeProcessMnodeMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
switch
(
pMsg
->
msgType
)
{
case
TSDB_MSG_TYPE_CREATE_MNODE_IN
:
...
...
source/dnode/mnode/inc/mnodeInt.h
浏览文件 @
99c71628
...
...
@@ -22,6 +22,8 @@
extern
"C"
{
#endif
typedef
enum
{
MN_STATUS_UNINIT
=
0
,
MN_STATUS_INIT
=
1
,
MN_STATUS_READY
=
2
,
MN_STATUS_CLOSING
=
3
}
EMnStatus
;
tmr_h
mnodeGetTimer
();
int32_t
mnodeGetDnodeId
();
int64_t
mnodeGetClusterId
();
...
...
source/dnode/mnode/src/mondeInt.c
浏览文件 @
99c71628
...
...
@@ -250,3 +250,6 @@ void mnodeCleanup() {
mInfo
(
"mnode is cleaned up"
);
}
}
int32_t
mnodeStart
()
{
return
0
;
}
void
mnodeStop
()
{}
\ No newline at end of file
source/dnode/vnode/impl/src/vnodeInt.c
浏览文件 @
99c71628
...
...
@@ -30,4 +30,4 @@ int32_t vnodeDrop(SVnode *pVnode) { return 0; }
int32_t
vnodeCompact
(
SVnode
*
pVnode
)
{
return
0
;
}
int32_t
vnodeSync
(
SVnode
*
pVnode
)
{
return
0
;
}
int32_t
vnodeProcessMsg
(
SVnode
*
pVnode
,
SVnodeMsg
*
pMsg
)
{
return
0
;
}
void
vnodeProcessMsg
(
SVnode
*
pVnode
,
SVnodeMsg
*
pMsg
)
{
}
source/util/src/terror.c
浏览文件 @
99c71628
...
...
@@ -228,6 +228,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists"
// dnode
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_MSG_NOT_PROCESSED
,
"Message not processed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_OUT_OF_MEMORY
,
"Dnode out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_DNODE_ID_NOT_MATCHED
,
"Dnode Id not matched"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_MNODE_ALREADY_DROPPED
,
"Mnode already deployed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_NO_WRITE_ACCESS
,
"No permission for disk files in dnode"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_INVALID_MSG_LEN
,
"Invalid message length"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_ACTION_IN_PROGRESS
,
"Action in progress"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录