Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
73b7bef1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
73b7bef1
编写于
10月 25, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-10430] refact dnode module
上级
78001a28
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
810 addition
and
981 deletion
+810
-981
include/common/taosmsg.h
include/common/taosmsg.h
+23
-46
include/server/dnode/dnode.h
include/server/dnode/dnode.h
+1
-1
include/server/mnode/mnode.h
include/server/mnode/mnode.h
+1
-1
source/client/CMakeLists.txt
source/client/CMakeLists.txt
+1
-1
source/client/src/client.c
source/client/src/client.c
+2
-0
source/libs/sync/src/sync.c
source/libs/sync/src/sync.c
+4
-1
source/libs/wal/src/wal.c
source/libs/wal/src/wal.c
+6
-1
source/server/dnode/CMakeLists.txt
source/server/dnode/CMakeLists.txt
+3
-0
source/server/dnode/inc/dnodeDnodeEps.h
source/server/dnode/inc/dnodeDnodeEps.h
+0
-34
source/server/dnode/inc/dnodeEps.h
source/server/dnode/inc/dnodeEps.h
+13
-8
source/server/dnode/inc/dnodeInt.h
source/server/dnode/inc/dnodeInt.h
+7
-0
source/server/dnode/inc/dnodeMain.h
source/server/dnode/inc/dnodeMain.h
+0
-47
source/server/dnode/inc/dnodeMnodeEps.h
source/server/dnode/inc/dnodeMnodeEps.h
+0
-36
source/server/dnode/inc/dnodeMsg.h
source/server/dnode/inc/dnodeMsg.h
+5
-2
source/server/dnode/src/dnodeCfg.c
source/server/dnode/src/dnodeCfg.c
+0
-178
source/server/dnode/src/dnodeEps.c
source/server/dnode/src/dnodeEps.c
+415
-0
source/server/dnode/src/dnodeInt.c
source/server/dnode/src/dnodeInt.c
+150
-41
source/server/dnode/src/dnodeMain.c
source/server/dnode/src/dnodeMain.c
+0
-267
source/server/dnode/src/dnodeMnodeEps.c
source/server/dnode/src/dnodeMnodeEps.c
+0
-311
source/server/dnode/src/dnodeMsg.c
source/server/dnode/src/dnodeMsg.c
+174
-0
source/server/dnode/src/dnodeTrans.c
source/server/dnode/src/dnodeTrans.c
+5
-6
未找到文件。
include/common/taosmsg.h
浏览文件 @
73b7bef1
...
@@ -657,15 +657,18 @@ typedef struct SVgroupAccess {
...
@@ -657,15 +657,18 @@ typedef struct SVgroupAccess {
}
SVgroupAccess
;
}
SVgroupAccess
;
typedef
struct
{
typedef
struct
{
int32_t
dnodeId
;
int32_t
dnodeId
;
uint32_t
moduleStatus
;
int8_t
dropped
;
uint32_t
numOfVnodes
;
char
reserved
[
19
];
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
int64_t
clusterId
;
char
reserved
[
16
];
int32_t
numOfDnodes
;
int32_t
numOfVnodes
;
}
SDnodeCfg
;
}
SDnodeCfg
;
typedef
struct
{
typedef
struct
{
int32_t
dnodeId
;
int32_t
dnodeId
;
int8_t
isMnode
;
int8_t
reserved
;
uint16_t
dnodePort
;
uint16_t
dnodePort
;
char
dnodeFqdn
[
TSDB_FQDN_LEN
];
char
dnodeFqdn
[
TSDB_FQDN_LEN
];
}
SDnodeEp
;
}
SDnodeEp
;
...
@@ -676,55 +679,29 @@ typedef struct {
...
@@ -676,55 +679,29 @@ typedef struct {
}
SDnodeEps
;
}
SDnodeEps
;
typedef
struct
{
typedef
struct
{
int32_t
mnodeId
;
int32_t
statusInterval
;
// tsStatusInterval
char
mnodeEp
[
TSDB_EP_LEN
];
int8_t
reserved
[
36
];
}
SMInfo
;
int64_t
checkTime
;
// 1970-01-01 00:00:00.000
char
timezone
[
64
];
// tsTimezone
typedef
struct
SMInfos
{
char
locale
[
TSDB_LOCALE_LEN
];
// tsLocale
int8_t
inUse
;
char
charset
[
TSDB_LOCALE_LEN
];
// tsCharset
int8_t
mnodeNum
;
SMInfo
mnodeInfos
[
TSDB_MAX_REPLICA
];
}
SMInfos
;
typedef
struct
{
int32_t
numOfMnodes
;
// tsNumOfMnodes
int32_t
mnodeEqualVnodeNum
;
// tsMnodeEqualVnodeNum
int32_t
offlineThreshold
;
// tsOfflineThreshold
int32_t
statusInterval
;
// tsStatusInterval
int32_t
maxtablesPerVnode
;
int32_t
maxVgroupsPerDb
;
char
arbitrator
[
TSDB_EP_LEN
];
// tsArbitrator
char
reserve
[
2
];
// to solve arm32 bus error
char
timezone
[
64
];
// tsTimezone
int64_t
checkTime
;
// 1970-01-01 00:00:00.000
char
locale
[
TSDB_LOCALE_LEN
];
// tsLocale
char
charset
[
TSDB_LOCALE_LEN
];
// tsCharset
int8_t
enableBalance
;
// tsEnableBalance
int8_t
flowCtrl
;
int8_t
slaveQuery
;
int8_t
adjustMaster
;
int8_t
reserved
[
4
];
}
SClusterCfg
;
}
SClusterCfg
;
typedef
struct
SStatusMsg
{
typedef
struct
SStatusMsg
{
uint32_t
version
;
uint32_t
version
;
int32_t
dnodeId
;
int32_t
dnodeId
;
uint32_t
lastReboot
;
// time stamp for last reboot
int32_t
openVnodes
;
int32_t
numOfCores
;
float
diskAvailable
;
int8_t
reserved
[
36
];
char
dnodeEp
[
TSDB_EP_LEN
];
char
dnodeEp
[
TSDB_EP_LEN
];
uint32_t
moduleStatus
;
int64_t
clusterId
;
uint32_t
lastReboot
;
// time stamp for last reboot
uint16_t
reserve1
;
// from config file
uint16_t
openVnodes
;
uint16_t
numOfCores
;
float
diskAvailable
;
// GB
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
uint8_t
alternativeRole
;
uint8_t
reserve2
[
15
];
SClusterCfg
clusterCfg
;
SClusterCfg
clusterCfg
;
SVnodeLoad
load
[];
SVnodeLoad
load
[];
}
SStatusMsg
;
}
SStatusMsg
;
typedef
struct
{
typedef
struct
{
SMInfos
mnodes
;
SDnodeCfg
dnodeCfg
;
SDnodeCfg
dnodeCfg
;
SVgroupAccess
vgAccess
[];
SVgroupAccess
vgAccess
[];
}
SStatusRsp
;
}
SStatusRsp
;
...
@@ -860,9 +837,9 @@ typedef struct {
...
@@ -860,9 +837,9 @@ typedef struct {
}
SCreateDnodeMsg
,
SDropDnodeMsg
;
}
SCreateDnodeMsg
,
SDropDnodeMsg
;
typedef
struct
{
typedef
struct
{
int32_t
dnodeId
;
int32_t
dnodeId
;
char
dnodeEp
[
TSDB_EP_LEN
];
// end point, hostname:port
int32_t
mnodeNum
;
S
MInfos
mnodes
;
S
DnodeEp
mnodeEps
[]
;
}
SCreateMnodeMsg
;
}
SCreateMnodeMsg
;
typedef
struct
{
typedef
struct
{
...
...
include/server/dnode/dnode.h
浏览文件 @
73b7bef1
...
@@ -65,7 +65,7 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
...
@@ -65,7 +65,7 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
* @param fqdn, the fqdn of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
* @param port, the port of dnode.
*/
*/
void
dnodeGet
Dnode
Ep
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
include/server/mnode/mnode.h
浏览文件 @
73b7bef1
...
@@ -60,7 +60,7 @@ typedef struct {
...
@@ -60,7 +60,7 @@ typedef struct {
typedef
struct
{
typedef
struct
{
SMnodeFp
fp
;
SMnodeFp
fp
;
char
clusterId
[
TSDB_CLUSTER_ID_LEN
]
;
int64_t
clusterId
;
int32_t
dnodeId
;
int32_t
dnodeId
;
}
SMnodePara
;
}
SMnodePara
;
...
...
source/client/CMakeLists.txt
浏览文件 @
73b7bef1
...
@@ -7,4 +7,4 @@ target_include_directories(
...
@@ -7,4 +7,4 @@ target_include_directories(
target_link_libraries
(
target_link_libraries
(
taos
taos
INTERFACE api
INTERFACE api
)
)
\ No newline at end of file
source/client/src/client.c
浏览文件 @
73b7bef1
...
@@ -19,3 +19,5 @@
...
@@ -19,3 +19,5 @@
//
//
//}
//}
int
taos_init
()
{
return
0
;
}
void
taos_cleanup
(
void
)
{}
source/libs/sync/src/sync.c
浏览文件 @
73b7bef1
...
@@ -13,4 +13,7 @@
...
@@ -13,4 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "sync.h"
#include "sync.h"
\ No newline at end of file
int32_t
syncInit
()
{
return
0
;}
void
syncCleanUp
()
{}
\ No newline at end of file
source/libs/wal/src/wal.c
浏览文件 @
73b7bef1
...
@@ -11,4 +11,9 @@
...
@@ -11,4 +11,9 @@
*
*
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
\ No newline at end of file
#include "wal.h"
int32_t
walInit
()
{
return
0
;}
void
walCleanUp
()
{}
\ No newline at end of file
source/server/dnode/CMakeLists.txt
浏览文件 @
73b7bef1
...
@@ -5,6 +5,9 @@ target_link_libraries(
...
@@ -5,6 +5,9 @@ target_link_libraries(
PUBLIC cjson
PUBLIC cjson
PUBLIC mnode
PUBLIC mnode
PUBLIC vnode
PUBLIC vnode
PUBLIC wal
PUBLIC sync
PUBLIC taos
)
)
target_include_directories
(
target_include_directories
(
dnode
dnode
...
...
source/server/dnode/inc/dnodeDnodeEps.h
已删除
100644 → 0
浏览文件 @
78001a28
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_DNODE_EPS_H_
#define _TD_DNODE_DNODE_EPS_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "dnodeInt.h"
int32_t
dnodeInitDnodeEps
();
void
dnodeCleanupDnodeEps
();
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
);
bool
dnodeIsDnodeEpChanged
(
int32_t
dnodeId
,
char
*
epstr
);
void
dnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DNODE_DNODE_EPS_H_*/
\ No newline at end of file
source/server/dnode/inc/dnode
Cfg
.h
→
source/server/dnode/inc/dnode
Eps
.h
浏览文件 @
73b7bef1
...
@@ -13,25 +13,30 @@
...
@@ -13,25 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#ifndef _TD_DNODE_
CFG
_H_
#ifndef _TD_DNODE_
EPS
_H_
#define _TD_DNODE_
CFG
_H_
#define _TD_DNODE_
EPS
_H_
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
#include "dnodeInt.h"
#include "dnodeInt.h"
int32_t
dnodeInitEps
();
void
dnodeCleanupEps
();
int32_t
dnodeInitCfg
();
void
dnodeCleanupCfg
();
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
);
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
);
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
);
void
dnodeUpdateMnodeEps
(
SRpcEpSet
*
pEpSet
);
int32_t
dnodeGetDnodeId
();
int32_t
dnodeGetDnodeId
();
void
dnodeGetClusterId
(
char
*
clusterId
);
int64_t
dnodeGetClusterId
();
void
dnodeGetCfg
(
int32_t
*
dnodeId
,
char
*
clusterId
);
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
);
void
dnodeSetDropped
();
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
);
void
dnodeGetEpSetForShell
(
SRpcEpSet
*
epSet
);
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
#endif
/*_TD_DNODE_
CFG
_H_*/
#endif
/*_TD_DNODE_
EPS
_H_*/
\ No newline at end of file
source/server/dnode/inc/dnodeInt.h
浏览文件 @
73b7bef1
...
@@ -24,6 +24,7 @@ extern "C" {
...
@@ -24,6 +24,7 @@ extern "C" {
#include "tglobal.h"
#include "tglobal.h"
#include "tlog.h"
#include "tlog.h"
#include "trpc.h"
#include "trpc.h"
#include "ttimer.h"
#include "dnode.h"
#include "dnode.h"
extern
int32_t
dDebugFlag
;
extern
int32_t
dDebugFlag
;
...
@@ -35,6 +36,12 @@ extern int32_t dDebugFlag;
...
@@ -35,6 +36,12 @@ extern int32_t dDebugFlag;
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef
enum
{
DN_RUN_STAT_INIT
,
DN_RUN_STAT_RUNNING
,
DN_RUN_STAT_STOPPED
}
EDnStat
;
EDnStat
dnodeGetRunStat
();
void
dnodeSetRunStat
();
void
dnodeGetStartup
(
SStartupStep
*
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/server/dnode/inc/dnodeMain.h
已删除
100644 → 0
浏览文件 @
78001a28
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_MAIN_H_
#define _TD_DNODE_MAIN_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "dnodeInt.h"
typedef
enum
{
DN_RUN_STAT_INIT
,
DN_RUN_STAT_RUNNING
,
DN_RUN_STAT_STOPPED
}
EDnStat
;
int32_t
dnodeInitMain
();
void
dnodeCleanupMain
();
int32_t
dnodeInitStorage
();
void
dnodeCleanupStorage
();
void
dnodeReportStartup
(
char
*
name
,
char
*
desc
);
void
dnodeReportStartupFinished
(
char
*
name
,
char
*
desc
);
void
dnodeProcessStartupReq
(
SRpcMsg
*
pMsg
);
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
);
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
);
EDnStat
dnodeGetRunStat
();
void
dnodeSetRunStat
();
void
*
dnodeGetTimer
();
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DNODE_MAIN_H_*/
source/server/dnode/inc/dnodeMnodeEps.h
已删除
100644 → 0
浏览文件 @
78001a28
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_MNODE_EP_H_
#define _TD_DNODE_MNODE_EP_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "dnodeInt.h"
int32_t
dnodeInitMnodeEps
();
void
dnodeCleanupMnodeEps
();
void
dnodeUpdateMnodeFromStatus
(
SMInfos
*
pMinfos
);
void
dnodeUpdateMnodeFromPeer
(
SRpcEpSet
*
pEpSet
);
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
);
void
dnodeGetEpSetForShell
(
SRpcEpSet
*
epSet
);
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DNODE_MNODE_EP_H_*/
source/server/dnode/inc/dnode
Status
.h
→
source/server/dnode/inc/dnode
Msg
.h
浏览文件 @
73b7bef1
...
@@ -21,9 +21,12 @@ extern "C" {
...
@@ -21,9 +21,12 @@ extern "C" {
#endif
#endif
#include "dnodeInt.h"
#include "dnodeInt.h"
int32_t
dnodeInit
Status
();
int32_t
dnodeInit
Msg
();
void
dnodeCleanup
Status
();
void
dnodeCleanup
Msg
();
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
);
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
);
void
dnodeProcessStartupReq
(
SRpcMsg
*
pMsg
);
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
);
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/server/dnode/src/dnodeCfg.c
已删除
100644 → 0
浏览文件 @
78001a28
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "dnodeCfg.h"
static
struct
DnCfg
{
int32_t
dnodeId
;
int32_t
dropped
;
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
char
file
[
PATH_MAX
+
20
];
pthread_mutex_t
mutex
;
}
tsDcfg
;
static
int32_t
dnodeReadCfg
()
{
int32_t
len
=
0
;
int32_t
maxLen
=
200
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
fp
=
fopen
(
tsDcfg
.
file
,
"r"
);
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
tsDcfg
.
file
);
goto
PARSE_CFG_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
tsDcfg
.
file
);
goto
PARSE_CFG_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
tsDcfg
.
file
);
goto
PARSE_CFG_OVER
;
}
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
root
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dnodeId not found"
,
tsDcfg
.
file
);
goto
PARSE_CFG_OVER
;
}
tsDcfg
.
dnodeId
=
(
int32_t
)
dnodeId
->
valueint
;
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
tsDcfg
.
file
);
goto
PARSE_CFG_OVER
;
}
tsDcfg
.
dropped
=
(
int32_t
)
dropped
->
valueint
;
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterId"
);
if
(
!
clusterId
||
clusterId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since clusterId not found"
,
tsDcfg
.
file
);
goto
PARSE_CFG_OVER
;
}
tstrncpy
(
tsDcfg
.
clusterId
,
clusterId
->
valuestring
,
TSDB_CLUSTER_ID_LEN
);
dInfo
(
"successed to read %s"
,
tsDcfg
.
file
);
PARSE_CFG_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
terrno
=
0
;
return
0
;
}
static
int32_t
dnodeWriteCfg
()
{
FILE
*
fp
=
fopen
(
tsDcfg
.
file
,
"w"
);
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
tsDcfg
.
file
,
strerror
(
errno
));
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
200
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
: %d,
\n
"
,
tsDcfg
.
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d,
\n
"
,
tsDcfg
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%s
\"\n
"
,
tsDcfg
.
clusterId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
dInfo
(
"successed to write %s"
,
tsDcfg
.
file
);
return
0
;
}
int32_t
dnodeInitCfg
()
{
tsDcfg
.
dnodeId
=
0
;
tsDcfg
.
dropped
=
0
;
tsDcfg
.
clusterId
[
0
]
=
0
;
snprintf
(
tsDcfg
.
file
,
sizeof
(
tsDcfg
.
file
),
"%s/dnodeCfg.json"
,
tsDnodeDir
);
pthread_mutex_init
(
&
tsDcfg
.
mutex
,
NULL
);
int32_t
ret
=
dnodeReadCfg
();
if
(
ret
==
0
)
{
dInfo
(
"dnode cfg is initialized"
);
}
if
(
tsDcfg
.
dropped
)
{
dInfo
(
"dnode is dropped and start to exit"
);
return
-
1
;
}
return
ret
;
}
void
dnodeCleanupCfg
()
{
pthread_mutex_destroy
(
&
tsDcfg
.
mutex
);
}
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
)
{
if
(
tsDcfg
.
dnodeId
!=
0
)
return
;
pthread_mutex_lock
(
&
tsDcfg
.
mutex
);
tsDcfg
.
dnodeId
=
data
->
dnodeId
;
tstrncpy
(
tsDcfg
.
clusterId
,
data
->
clusterId
,
TSDB_CLUSTER_ID_LEN
);
dInfo
(
"dnodeId is set to %d, clusterId is set to %s"
,
data
->
dnodeId
,
data
->
clusterId
);
dnodeWriteCfg
();
pthread_mutex_unlock
(
&
tsDcfg
.
mutex
);
}
void
dnodeSetDropped
()
{
pthread_mutex_lock
(
&
tsDcfg
.
mutex
);
tsDcfg
.
dropped
=
1
;
dnodeWriteCfg
();
pthread_mutex_unlock
(
&
tsDcfg
.
mutex
);
}
int32_t
dnodeGetDnodeId
()
{
int32_t
dnodeId
=
0
;
pthread_mutex_lock
(
&
tsDcfg
.
mutex
);
dnodeId
=
tsDcfg
.
dnodeId
;
pthread_mutex_unlock
(
&
tsDcfg
.
mutex
);
return
dnodeId
;
}
void
dnodeGetClusterId
(
char
*
clusterId
)
{
pthread_mutex_lock
(
&
tsDcfg
.
mutex
);
tstrncpy
(
clusterId
,
tsDcfg
.
clusterId
,
TSDB_CLUSTER_ID_LEN
);
pthread_mutex_unlock
(
&
tsDcfg
.
mutex
);
}
void
dnodeGetCfg
(
int32_t
*
dnodeId
,
char
*
clusterId
)
{
pthread_mutex_lock
(
&
tsDcfg
.
mutex
);
*
dnodeId
=
tsDcfg
.
dnodeId
;
tstrncpy
(
clusterId
,
tsDcfg
.
clusterId
,
TSDB_CLUSTER_ID_LEN
);
pthread_mutex_unlock
(
&
tsDcfg
.
mutex
);
}
source/server/dnode/src/dnode
Dnode
Eps.c
→
source/server/dnode/src/dnodeEps.c
浏览文件 @
73b7bef1
...
@@ -14,131 +14,250 @@
...
@@ -14,131 +14,250 @@
*/
*/
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "
o
s.h"
#include "
dnodeEp
s.h"
#include "cJSON.h"
#include "cJSON.h"
#include "thash.h"
#include "thash.h"
#include "tglobal.h"
#include "dnodeDnodeEps.h"
#include "dnodeCfg.h"
static
struct
{
static
struct
{
int32_t
dnodeId
;
int32_t
dnodeId
;
int32_t
dnodeNum
;
int32_t
dropped
;
SDnodeEp
*
dnodeList
;
int64_t
clusterId
;
SHashObj
*
dnodeHash
;
SDnodeEps
*
dnodeEps
;
char
file
[
PATH_MAX
+
20
];
SHashObj
*
dnodeHash
;
SRpcEpSet
mnodeEpSetForShell
;
SRpcEpSet
mnodeEpSetForPeer
;
char
file
[
PATH_MAX
+
20
];
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
}
tsDeps
;
}
tsEps
;
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsEps
.
mutex
);
*
epSet
=
tsEps
.
mnodeEpSetForPeer
;
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
}
void
dnodeGetEpSetForShell
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsEps
.
mutex
);
*
epSet
=
tsEps
.
mnodeEpSetForShell
;
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
}
void
dnodeUpdateMnodeEps
(
SRpcEpSet
*
ep
)
{
if
(
ep
!=
NULL
||
ep
->
numOfEps
<=
0
)
{
dError
(
"mnode is changed, but content is invalid, discard it"
);
return
;
}
pthread_mutex_lock
(
&
tsEps
.
mutex
);
dInfo
(
"mnode is changed, num:%d use:%d"
,
ep
->
numOfEps
,
ep
->
inUse
);
tsEps
.
mnodeEpSetForPeer
=
*
ep
;
for
(
int32_t
i
=
0
;
i
<
ep
->
numOfEps
;
++
i
)
{
ep
->
port
[
i
]
-=
TSDB_PORT_DNODEDNODE
;
dInfo
(
"mnode index:%d %s:%u"
,
i
,
ep
->
fqdn
[
i
],
ep
->
port
[
i
]);
}
tsEps
.
mnodeEpSetForShell
=
*
ep
;
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
}
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
)
{
SRpcConnInfo
connInfo
=
{
0
};
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
);
SRpcEpSet
epSet
=
{
0
};
if
(
forShell
)
{
dnodeGetEpSetForShell
(
&
epSet
);
}
else
{
dnodeGetEpSetForPeer
(
&
epSet
);
}
dDebug
(
"msg:%s will be redirected, num:%d use:%d"
,
taosMsg
[
rpcMsg
->
msgType
],
epSet
.
numOfEps
,
epSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
dDebug
(
"mnode index:%d %s:%d"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
]);
if
(
strcmp
(
epSet
.
fqdn
[
i
],
tsLocalFqdn
)
==
0
)
{
if
((
epSet
.
port
[
i
]
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
&&
!
forShell
)
||
(
epSet
.
port
[
i
]
==
tsServerPort
&&
forShell
))
{
epSet
.
inUse
=
(
i
+
1
)
%
epSet
.
numOfEps
;
dDebug
(
"mnode index:%d %s:%d set inUse to %d"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
],
epSet
.
inUse
);
}
}
epSet
.
port
[
i
]
=
htons
(
epSet
.
port
[
i
]);
}
rpcSendRedirectRsp
(
rpcMsg
->
handle
,
&
epSet
);
}
static
void
dnodePrintEps
()
{
static
void
dnodePrintEps
()
{
dDebug
(
"print dnode
Ep, dnodeNum:%d"
,
tsDeps
.
dnodeNum
);
dDebug
(
"print dnode
list, num:%d"
,
tsEps
.
dnodeEps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
ts
Deps
.
dnodeNum
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
ts
Eps
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
ts
Deps
.
dnodeList
[
i
];
SDnodeEp
*
ep
=
&
ts
Eps
.
dnodeEps
->
dnodeEps
[
i
];
dDebug
(
"dnode:%d,
dnodeFqdn:%s dnodePort:%u"
,
ep
->
dnodeId
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
dDebug
(
"dnode:%d,
fqdn:%s port:%u isMnode:%d"
,
ep
->
dnodeId
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
,
ep
->
isMnode
);
}
}
}
}
static
void
dnodeResetEps
(
SDnodeEps
*
data
)
{
static
void
dnodeResetEps
(
SDnodeEps
*
data
)
{
assert
(
data
!=
NULL
);
assert
(
data
!=
NULL
);
if
(
data
->
dnodeNum
>
tsDeps
.
dnodeNum
)
{
int32_t
size
=
sizeof
(
SDnodeEps
)
+
data
->
dnodeNum
*
sizeof
(
SDnodeEp
);
SDnodeEp
*
tmp
=
calloc
(
data
->
dnodeNum
,
sizeof
(
SDnodeEp
));
if
(
data
->
dnodeNum
>
tsEps
.
dnodeEps
->
dnodeNum
)
{
SDnodeEps
*
tmp
=
calloc
(
1
,
size
);
if
(
tmp
==
NULL
)
return
;
if
(
tmp
==
NULL
)
return
;
tfree
(
tsDeps
.
dnodeList
);
tfree
(
tsEps
.
dnodeEps
);
tsDeps
.
dnodeList
=
tmp
;
tsEps
.
dnodeEps
=
tmp
;
tsDeps
.
dnodeNum
=
data
->
dnodeNum
;
}
memcpy
(
tsDeps
.
dnodeList
,
data
->
dnodeEps
,
tsDeps
.
dnodeNum
*
sizeof
(
SDnodeEp
));
dnodePrintEps
();
if
(
tsEps
.
dnodeEps
!=
data
)
{
memcpy
(
tsEps
.
dnodeEps
,
data
,
size
);
}
for
(
int32_t
i
=
0
;
i
<
tsDeps
.
dnodeNum
;
++
i
)
{
tsEps
.
mnodeEpSetForPeer
.
inUse
=
0
;
SDnodeEp
*
ep
=
&
tsDeps
.
dnodeList
[
i
];
tsEps
.
mnodeEpSetForShell
.
inUse
=
0
;
taosHashPut
(
tsDeps
.
dnodeHash
,
&
ep
->
dnodeId
,
sizeof
(
int32_t
),
ep
,
sizeof
(
SDnodeEp
));
int32_t
index
=
0
;
}
for
(
int32_t
i
=
0
;
i
<
tsEps
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
tsEps
.
dnodeEps
->
dnodeEps
[
i
];
if
(
!
ep
->
isMnode
)
continue
;
if
(
index
>=
TSDB_MAX_REPLICA
)
continue
;
strcpy
(
tsEps
.
mnodeEpSetForShell
.
fqdn
[
index
],
ep
->
dnodeFqdn
);
strcpy
(
tsEps
.
mnodeEpSetForPeer
.
fqdn
[
index
],
ep
->
dnodeFqdn
);
tsEps
.
mnodeEpSetForShell
.
port
[
index
]
=
ep
->
dnodePort
;
tsEps
.
mnodeEpSetForShell
.
port
[
index
]
=
ep
->
dnodePort
+
tsDnodeDnodePort
;
index
++
;
}
for
(
int32_t
i
=
0
;
i
<
tsEps
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
tsEps
.
dnodeEps
->
dnodeEps
[
i
];
taosHashPut
(
tsEps
.
dnodeHash
,
&
ep
->
dnodeId
,
sizeof
(
int32_t
),
ep
,
sizeof
(
SDnodeEp
));
}
}
dnodePrintEps
();
}
static
bool
dnodeIsDnodeEpChanged
(
int32_t
dnodeId
,
char
*
epstr
)
{
bool
changed
=
false
;
pthread_mutex_lock
(
&
tsEps
.
mutex
);
SDnodeEp
*
ep
=
taosHashGet
(
tsEps
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
ep
!=
NULL
)
{
char
epSaved
[
TSDB_EP_LEN
+
1
];
snprintf
(
epSaved
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
changed
=
strcmp
(
epstr
,
epSaved
)
!=
0
;
tstrncpy
(
epstr
,
epSaved
,
TSDB_EP_LEN
);
}
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
return
changed
;
}
}
static
int32_t
dnodeReadEps
()
{
static
int32_t
dnodeReadEps
()
{
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
FILE
*
fp
=
NULL
;
fp
=
fopen
(
ts
De
ps
.
file
,
"r"
);
fp
=
fopen
(
ts
E
ps
.
file
,
"r"
);
if
(
!
fp
)
{
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
ts
De
ps
.
file
);
dDebug
(
"file %s not exist"
,
ts
E
ps
.
file
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
ts
De
ps
.
file
);
dError
(
"failed to read %s since content is null"
,
ts
E
ps
.
file
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
content
[
len
]
=
0
;
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
tsDeps
.
file
);
dError
(
"failed to read %s since invalid json format"
,
tsEps
.
file
);
goto
PRASE_EPS_OVER
;
}
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
root
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dnodeId not found"
,
tsEps
.
file
);
goto
PRASE_EPS_OVER
;
}
tsEps
.
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dropped not found"
,
tsEps
.
file
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
tsEps
.
dropped
=
atoi
(
dropped
->
valuestring
);
cJSON
*
dnodeNum
=
cJSON_GetObjectItem
(
root
,
"dnodeNum
"
);
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterId
"
);
if
(
!
dnodeNum
||
dnodeNum
->
type
!=
cJSON_Number
)
{
if
(
!
clusterId
||
clusterId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since
dnodeNum not found"
,
tsDe
ps
.
file
);
dError
(
"failed to read %s since
clusterId not found"
,
tsE
ps
.
file
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
tsEps
.
clusterId
=
atoll
(
clusterId
->
valuestring
);
cJSON
*
dnodeInfos
=
cJSON_GetObjectItem
(
root
,
"dnodeInfos"
);
cJSON
*
dnodeInfos
=
cJSON_GetObjectItem
(
root
,
"dnodeInfos"
);
if
(
!
dnodeInfos
||
dnodeInfos
->
type
!=
cJSON_Array
)
{
if
(
!
dnodeInfos
||
dnodeInfos
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since dnodeInfos not found"
,
ts
De
ps
.
file
);
dError
(
"failed to read %s since dnodeInfos not found"
,
ts
E
ps
.
file
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
int32_t
dnodeInfosSize
=
cJSON_GetArraySize
(
dnodeInfos
);
int32_t
dnodeInfosSize
=
cJSON_GetArraySize
(
dnodeInfos
);
if
(
dnodeInfosSize
!=
dnodeNum
->
valueint
)
{
if
(
dnodeInfosSize
<=
0
)
{
dError
(
"failed to read %s since dnodeInfos size:%d not matched dnodeNum:%d"
,
tsDeps
.
file
,
dnodeInfosSize
,
dError
(
"failed to read %s since dnodeInfos size:%d invalid"
,
tsEps
.
file
,
dnodeInfosSize
);
(
int32_t
)
dnodeNum
->
valueint
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
tsDeps
.
dnodeNum
=
dnodeInfosSize
;
tsEps
.
dnodeEps
=
calloc
(
1
,
dnodeInfosSize
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
));
tsDeps
.
dnodeList
=
calloc
(
dnodeInfosSize
,
sizeof
(
SDnodeEp
));
if
(
tsEps
.
dnodeEps
==
NULL
)
{
if
(
tsDeps
.
dnodeList
==
NULL
)
{
dError
(
"failed to calloc dnodeEpList since %s"
,
strerror
(
errno
));
dError
(
"failed to calloc dnodeEpList since %s"
,
strerror
(
errno
));
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
tsEps
.
dnodeEps
->
dnodeNum
=
dnodeInfosSize
;
for
(
int32_t
i
=
0
;
i
<
dnodeInfosSize
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
dnodeInfosSize
;
++
i
)
{
cJSON
*
dnodeInfo
=
cJSON_GetArrayItem
(
dnodeInfos
,
i
);
cJSON
*
dnodeInfo
=
cJSON_GetArrayItem
(
dnodeInfos
,
i
);
if
(
dnodeInfo
==
NULL
)
break
;
if
(
dnodeInfo
==
NULL
)
break
;
SDnodeEp
*
ep
=
&
ts
Deps
.
dnodeList
[
i
];
SDnodeEp
*
ep
=
&
ts
Eps
.
dnodeEps
->
dnodeEps
[
i
];
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeId"
);
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_Number
)
{
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodeId not found"
,
tsDeps
.
file
);
dError
(
"failed to read %s, dnodeId not found"
,
tsEps
.
file
);
goto
PRASE_EPS_OVER
;
}
ep
->
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
isMnode
=
cJSON_GetObjectItem
(
dnodeInfo
,
"isMnode"
);
if
(
!
isMnode
||
isMnode
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, isMnode not found"
,
tsEps
.
file
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
ep
->
dnodeId
=
(
int32_t
)
dnodeId
->
valueint
;
ep
->
isMnode
=
atoi
(
isMnode
->
valuestring
)
;
cJSON
*
dnodeFqdn
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeFqdn"
);
cJSON
*
dnodeFqdn
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeFqdn"
);
if
(
!
dnodeFqdn
||
dnodeFqdn
->
type
!=
cJSON_String
||
dnodeFqdn
->
valuestring
==
NULL
)
{
if
(
!
dnodeFqdn
||
dnodeFqdn
->
type
!=
cJSON_String
||
dnodeFqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s, dnodeFqdn not found"
,
ts
De
ps
.
file
);
dError
(
"failed to read %s, dnodeFqdn not found"
,
ts
E
ps
.
file
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
tstrncpy
(
ep
->
dnodeFqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
tstrncpy
(
ep
->
dnodeFqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
dnodePort
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodePort"
);
cJSON
*
dnodePort
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodePort"
);
if
(
!
dnodePort
||
dnodePort
->
type
!=
cJSON_
Number
)
{
if
(
!
dnodePort
||
dnodePort
->
type
!=
cJSON_
String
)
{
dError
(
"failed to read %s, dnodePort not found"
,
ts
De
ps
.
file
);
dError
(
"failed to read %s, dnodePort not found"
,
ts
E
ps
.
file
);
goto
PRASE_EPS_OVER
;
goto
PRASE_EPS_OVER
;
}
}
ep
->
dnodePort
=
(
uint16_t
)
dnodePort
->
valueint
;
ep
->
dnodePort
=
atoi
(
dnodePort
->
valuestring
)
;
}
}
dInfo
(
"succcessed to read file %s"
,
ts
De
ps
.
file
);
dInfo
(
"succcessed to read file %s"
,
ts
E
ps
.
file
);
dnodePrintEps
();
dnodePrintEps
();
PRASE_EPS_OVER:
PRASE_EPS_OVER:
...
@@ -146,35 +265,40 @@ PRASE_EPS_OVER:
...
@@ -146,35 +265,40 @@ PRASE_EPS_OVER:
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
if
(
dnodeIsDnodeEpChanged
(
ts
De
ps
.
dnodeId
,
tsLocalEp
))
{
if
(
dnodeIsDnodeEpChanged
(
ts
E
ps
.
dnodeId
,
tsLocalEp
))
{
dError
(
"dnode:%d, localEp
different from %s dnodeEps.json and need reconfigured"
,
tsDe
ps
.
dnodeId
,
tsLocalEp
);
dError
(
"dnode:%d, localEp
%s different with dnodeEps.json and need reconfigured"
,
tsE
ps
.
dnodeId
,
tsLocalEp
);
return
-
1
;
return
-
1
;
}
}
dnodeResetEps
(
tsEps
.
dnodeEps
);
terrno
=
0
;
terrno
=
0
;
return
0
;
return
0
;
}
}
static
int32_t
dnodeWriteEps
()
{
static
int32_t
dnodeWriteEps
()
{
FILE
*
fp
=
fopen
(
ts
De
ps
.
file
,
"w"
);
FILE
*
fp
=
fopen
(
ts
E
ps
.
file
,
"w"
);
if
(
!
fp
)
{
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
ts
De
ps
.
file
,
strerror
(
errno
));
dError
(
"failed to write %s since %s"
,
ts
E
ps
.
file
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
}
}
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeNum
\"
: %d,
\n
"
,
tsDeps
.
dnodeNum
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
tsEps
.
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
:
\"
%d
\"
,
\n
"
,
tsEps
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
tsEps
.
clusterId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeInfos
\"
: [{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeInfos
\"
: [{
\n
"
);
for
(
int32_t
i
=
0
;
i
<
tsDeps
.
dnodeNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
tsEps
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
tsDeps
.
dnodeList
[
i
];
SDnodeEp
*
ep
=
&
tsEps
.
dnodeEps
->
dnodeEps
[
i
];
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
: %d,
\n
"
,
ep
->
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
ep
->
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
isMnode
\"
:
\"
%d
\"
,
\n
"
,
ep
->
isMnode
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeFqdn
\"
:
\"
%s
\"
,
\n
"
,
ep
->
dnodeFqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeFqdn
\"
:
\"
%s
\"
,
\n
"
,
ep
->
dnodeFqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodePort
\"
:
%u
\n
"
,
ep
->
dnodePort
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodePort
\"
:
\"
%u
\"
\n
"
,
ep
->
dnodePort
);
if
(
i
<
ts
Deps
.
dnodeNum
-
1
)
{
if
(
i
<
ts
Eps
.
dnodeEps
->
dnodeNum
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }]
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }]
\n
"
);
...
@@ -188,18 +312,20 @@ static int32_t dnodeWriteEps() {
...
@@ -188,18 +312,20 @@ static int32_t dnodeWriteEps() {
free
(
content
);
free
(
content
);
terrno
=
0
;
terrno
=
0
;
dInfo
(
"successed to write %s"
,
ts
De
ps
.
file
);
dInfo
(
"successed to write %s"
,
ts
E
ps
.
file
);
return
0
;
return
0
;
}
}
int32_t
dnodeInitDnodeEps
()
{
int32_t
dnodeInitEps
()
{
tsDeps
.
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
tsEps
.
dnodeId
=
0
;
if
(
tsDeps
.
dnodeHash
==
NULL
)
return
-
1
;
tsEps
.
dropped
=
0
;
tsEps
.
clusterId
=
0
;
tsEps
.
dnodeEps
=
NULL
;
snprintf
(
tsEps
.
file
,
sizeof
(
tsEps
.
file
),
"%s/dnodeEps.json"
,
tsDnodeDir
);
pthread_mutex_init
(
&
tsEps
.
mutex
,
NULL
);
tsDeps
.
dnodeId
=
dnodeGetDnodeId
();
tsEps
.
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
tsDeps
.
dnodeNum
=
0
;
if
(
tsEps
.
dnodeHash
==
NULL
)
return
-
1
;
snprintf
(
tsDeps
.
file
,
sizeof
(
tsDeps
.
file
),
"%s/dnodeEps.json"
,
tsDnodeDir
);
pthread_mutex_init
(
&
tsDeps
.
mutex
,
NULL
);
int32_t
ret
=
dnodeReadEps
();
int32_t
ret
=
dnodeReadEps
();
if
(
ret
==
0
)
{
if
(
ret
==
0
)
{
...
@@ -209,76 +335,81 @@ int32_t dnodeInitDnodeEps() {
...
@@ -209,76 +335,81 @@ int32_t dnodeInitDnodeEps() {
return
ret
;
return
ret
;
}
}
void
dnodeCleanup
Dnode
Eps
()
{
void
dnodeCleanupEps
()
{
pthread_mutex_lock
(
&
ts
De
ps
.
mutex
);
pthread_mutex_lock
(
&
ts
E
ps
.
mutex
);
if
(
ts
Deps
.
dnodeList
!=
NULL
)
{
if
(
ts
Eps
.
dnodeEps
!=
NULL
)
{
free
(
ts
Deps
.
dnodeList
);
free
(
ts
Eps
.
dnodeEps
);
ts
Deps
.
dnodeList
=
NULL
;
ts
Eps
.
dnodeEps
=
NULL
;
}
}
if
(
ts
De
ps
.
dnodeHash
)
{
if
(
ts
E
ps
.
dnodeHash
)
{
taosHashCleanup
(
ts
De
ps
.
dnodeHash
);
taosHashCleanup
(
ts
E
ps
.
dnodeHash
);
ts
De
ps
.
dnodeHash
=
NULL
;
ts
E
ps
.
dnodeHash
=
NULL
;
}
}
tsDeps
.
dnodeNum
=
0
;
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
pthread_mutex_unlock
(
&
tsDeps
.
mutex
);
pthread_mutex_destroy
(
&
tsEps
.
mutex
);
pthread_mutex_destroy
(
&
tsDeps
.
mutex
);
}
}
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
)
{
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
)
{
if
(
data
==
NULL
||
data
->
dnodeNum
<=
0
)
return
;
if
(
data
==
NULL
||
data
->
dnodeNum
<=
0
)
return
;
data
->
dnodeNum
=
htonl
(
data
->
dnodeNum
);
pthread_mutex_lock
(
&
tsEps
.
mutex
);
for
(
int32_t
i
=
0
;
i
<
data
->
dnodeNum
;
++
i
)
{
data
->
dnodeEps
[
i
].
dnodeId
=
htonl
(
data
->
dnodeEps
[
i
].
dnodeId
);
data
->
dnodeEps
[
i
].
dnodePort
=
htons
(
data
->
dnodeEps
[
i
].
dnodePort
);
}
pthread_mutex_lock
(
&
tsDeps
.
mutex
);
if
(
data
->
dnodeNum
!=
ts
Deps
.
dnodeNum
)
{
if
(
data
->
dnodeNum
!=
ts
Eps
.
dnodeEps
->
dnodeNum
)
{
dnodeResetEps
(
data
);
dnodeResetEps
(
data
);
dnodeWriteEps
();
dnodeWriteEps
();
}
else
{
}
else
{
int32_t
size
=
data
->
dnodeNum
*
sizeof
(
SDnodeEp
);
int32_t
size
=
data
->
dnodeNum
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
)
;
if
(
memcmp
(
ts
Deps
.
dnodeList
,
data
->
dnodeEps
,
size
)
!=
0
)
{
if
(
memcmp
(
ts
Eps
.
dnodeEps
,
data
,
size
)
!=
0
)
{
dnodeResetEps
(
data
);
dnodeResetEps
(
data
);
dnodeWriteEps
();
dnodeWriteEps
();
}
}
}
}
pthread_mutex_unlock
(
&
ts
De
ps
.
mutex
);
pthread_mutex_unlock
(
&
ts
E
ps
.
mutex
);
}
}
bool
dnodeIsDnodeEpChanged
(
int32_t
dnodeId
,
char
*
epstr
)
{
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
)
{
bool
changed
=
false
;
pthread_mutex_lock
(
&
tsEps
.
mutex
);
pthread_mutex_lock
(
&
tsDeps
.
mutex
);
SDnodeEp
*
ep
=
taosHashGet
(
ts
De
ps
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
SDnodeEp
*
ep
=
taosHashGet
(
ts
E
ps
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
ep
!=
NULL
)
{
if
(
ep
!=
NULL
)
{
char
epSaved
[
TSDB_EP_LEN
+
1
];
if
(
port
)
*
port
=
ep
->
dnodePort
;
snprintf
(
epSaved
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
if
(
fqdn
)
tstrncpy
(
fqdn
,
ep
->
dnodeFqdn
,
TSDB_FQDN_LEN
);
changed
=
strcmp
(
epstr
,
epSaved
)
!=
0
;
if
(
epstr
)
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
tstrncpy
(
epstr
,
epSaved
,
TSDB_EP_LEN
);
}
}
pthread_mutex_unlock
(
&
tsDeps
.
mutex
);
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
return
changed
;
}
}
void
dnode
GetDnodeEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
)
{
void
dnode
UpdateCfg
(
SDnodeCfg
*
data
)
{
pthread_mutex_lock
(
&
tsDeps
.
mutex
)
;
if
(
tsEps
.
dnodeId
!=
0
&&
!
data
->
dropped
)
return
;
SDnodeEp
*
ep
=
taosHashGet
(
tsDeps
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
pthread_mutex_lock
(
&
tsEps
.
mutex
);
if
(
ep
!=
NULL
)
{
if
(
port
)
*
port
=
ep
->
dnodePort
;
tsEps
.
dnodeId
=
data
->
dnodeId
;
if
(
fqdn
)
tstrncpy
(
fqdn
,
ep
->
dnodeFqdn
,
TSDB_FQDN_LEN
);
tsEps
.
clusterId
=
data
->
clusterId
;
if
(
epstr
)
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
tsEps
.
dropped
=
data
->
dropped
;
}
dInfo
(
"dnodeId is set to %d, clusterId is set to %"
PRId64
,
data
->
dnodeId
,
data
->
clusterId
);
dnodeWriteEps
();
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
}
int32_t
dnodeGetDnodeId
()
{
int32_t
dnodeId
=
0
;
pthread_mutex_lock
(
&
tsEps
.
mutex
);
dnodeId
=
tsEps
.
dnodeId
;
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
return
dnodeId
;
}
pthread_mutex_unlock
(
&
tsDeps
.
mutex
);
int64_t
dnodeGetClusterId
()
{
int64_t
clusterId
=
0
;
pthread_mutex_lock
(
&
tsEps
.
mutex
);
clusterId
=
tsEps
.
clusterId
;
pthread_mutex_unlock
(
&
tsEps
.
mutex
);
return
clusterId
;
}
}
source/server/dnode/src/dnodeInt.c
浏览文件 @
73b7bef1
...
@@ -14,67 +14,176 @@
...
@@ -14,67 +14,176 @@
*/
*/
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#if 0
#include "qScript.h"
#include "tfile.h"
#include "tsync.h"
#include "twal.h"
#endif
#include "tstep.h"
#include "dnodeCfg.h"
#include "dnodeCheck.h"
#include "dnodeCheck.h"
#include "dnodeDnodeEps.h"
#include "dnodeEps.h"
#include "dnodeMain.h"
#include "dnodeMsg.h"
#include "dnodeMnodeEps.h"
#include "dnodeStatus.h"
#include "dnodeTrans.h"
#include "dnodeTrans.h"
#include "mnode.h"
#include "mnode.h"
#include "sync.h"
#include "tcache.h"
#include "tconfig.h"
#include "tnote.h"
#include "tstep.h"
#include "vnode.h"
#include "vnode.h"
#include "wal.h"
static
struct
{
EDnStat
runStatus
;
SStartupStep
startup
;
SSteps
*
steps
;
}
tsDnode
;
EDnStat
dnodeGetRunStat
()
{
return
tsDnode
.
runStatus
;
}
void
dnodeSetRunStat
(
EDnStat
stat
)
{
tsDnode
.
runStatus
=
stat
;
}
static
void
dnodeReportStartup
(
char
*
name
,
char
*
desc
)
{
SStartupStep
*
startup
=
&
tsDnode
.
startup
;
tstrncpy
(
startup
->
name
,
name
,
strlen
(
startup
->
name
));
tstrncpy
(
startup
->
desc
,
desc
,
strlen
(
startup
->
desc
));
startup
->
finished
=
0
;
}
static
void
dnodeReportStartupFinished
(
char
*
name
,
char
*
desc
)
{
SStartupStep
*
startup
=
&
tsDnode
.
startup
;
tstrncpy
(
startup
->
name
,
name
,
strlen
(
startup
->
name
));
tstrncpy
(
startup
->
desc
,
desc
,
strlen
(
startup
->
desc
));
startup
->
finished
=
1
;
}
static
struct
SSteps
*
tsSteps
;
void
dnodeGetStartup
(
SStartupStep
*
pStep
)
{
memcpy
(
pStep
,
&
tsDnode
.
startup
,
sizeof
(
SStartupStep
));
}
static
int32_t
dnodeInitVnode
Module
(
void
**
unused
)
{
static
int32_t
dnodeInitVnode
(
)
{
SVnodePara
para
;
SVnodePara
para
;
para
.
fp
.
GetDnodeEp
=
dnodeGet
Dnode
Ep
;
para
.
fp
.
GetDnodeEp
=
dnodeGetEp
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
return
vnodeInit
(
para
);
return
vnodeInit
(
para
);
}
}
static
int32_t
dnodeInitMnode
Module
(
void
**
unused
)
{
static
int32_t
dnodeInitMnode
(
)
{
SMnodePara
para
;
SMnodePara
para
;
para
.
fp
.
GetDnodeEp
=
dnodeGet
Dnode
Ep
;
para
.
fp
.
GetDnodeEp
=
dnodeGetEp
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendRedirectMsg
=
dnodeSendRedirectMsg
;
para
.
fp
.
SendRedirectMsg
=
dnodeSendRedirectMsg
;
dnodeGetCfg
(
&
para
.
dnodeId
,
para
.
clusterId
);
para
.
dnodeId
=
dnodeGetDnodeId
();
para
.
clusterId
=
dnodeGetClusterId
();
return
mnodeInit
(
para
);
return
mnodeInit
(
para
);
}
}
static
int32_t
dnodeInitTfs
()
{}
static
int32_t
dnodeInitMain
()
{
tsDnode
.
runStatus
=
DN_RUN_STAT_STOPPED
;
tscEmbedded
=
1
;
taosIgnSIGPIPE
();
taosBlockSIGPIPE
();
taosResolveCRC
();
taosInitGlobalCfg
();
taosReadGlobalLogCfg
();
taosSetCoreDump
(
tsEnableCoreFile
);
if
(
!
taosMkDir
(
tsLogDir
))
{
printf
(
"failed to create dir: %s, reason: %s
\n
"
,
tsLogDir
,
strerror
(
errno
));
return
-
1
;
}
char
temp
[
TSDB_FILENAME_LEN
];
sprintf
(
temp
,
"%s/taosdlog"
,
tsLogDir
);
if
(
taosInitLog
(
temp
,
tsNumOfLogLines
,
1
)
<
0
)
{
printf
(
"failed to init log file
\n
"
);
}
if
(
!
taosReadGlobalCfg
())
{
taosPrintGlobalCfg
();
dError
(
"TDengine read global config failed"
);
return
-
1
;
}
dInfo
(
"start to initialize TDengine"
);
taosInitNotes
();
return
taosCheckGlobalCfg
();
}
static
void
dnodeCleanupMain
()
{
taos_cleanup
();
taosCloseLog
();
taosStopCacheRefreshWorker
();
}
static
int32_t
dnodeCheckRunning
(
char
*
dir
)
{
char
filepath
[
256
]
=
{
0
};
snprintf
(
filepath
,
sizeof
(
filepath
),
"%s/.running"
,
dir
);
FileFd
fd
=
taosOpenFileCreateWriteTrunc
(
filepath
);
if
(
fd
<
0
)
{
dError
(
"failed to open lock file:%s since %s, quit"
,
filepath
,
strerror
(
errno
));
return
-
1
;
}
int32_t
ret
=
taosLockFile
(
fd
);
if
(
ret
!=
0
)
{
dError
(
"failed to lock file:%s since %s, quit"
,
filepath
,
strerror
(
errno
));
taosCloseFile
(
fd
);
return
-
1
;
}
return
0
;
}
static
int32_t
dnodeInitDir
()
{
sprintf
(
tsMnodeDir
,
"%s/mnode"
,
tsDataDir
);
sprintf
(
tsVnodeDir
,
"%s/vnode"
,
tsDataDir
);
sprintf
(
tsDnodeDir
,
"%s/dnode"
,
tsDataDir
);
if
(
!
taosMkDir
(
tsDnodeDir
))
{
dError
(
"failed to create dir:%s since %s"
,
tsDnodeDir
,
strerror
(
errno
));
return
-
1
;
}
if
(
!
taosMkDir
(
tsMnodeDir
))
{
dError
(
"failed to create dir:%s since %s"
,
tsMnodeDir
,
strerror
(
errno
));
return
-
1
;
}
if
(
!
taosMkDir
(
tsVnodeDir
))
{
dError
(
"failed to create dir:%s since %s"
,
tsVnodeDir
,
strerror
(
errno
));
return
-
1
;
}
if
(
dnodeCheckRunning
(
tsDnodeDir
)
!=
0
)
{
return
-
1
;
}
return
0
;
}
static
void
dnodeCleanupDir
()
{}
int32_t
dnodeInit
()
{
int32_t
dnodeInit
()
{
tsSteps
=
taosStepInit
(
24
,
dnodeReportStartup
);
SSteps
*
steps
=
taosStepInit
(
24
,
dnodeReportStartup
);
if
(
tsSteps
==
NULL
)
return
-
1
;
if
(
steps
==
NULL
)
return
-
1
;
taosStepAdd
(
tsSteps
,
"dnode-main"
,
dnodeInitMain
,
dnodeCleanupMain
);
taosStepAdd
(
steps
,
"dnode-main"
,
dnodeInitMain
,
dnodeCleanupMain
);
taosStepAdd
(
tsSteps
,
"dnode-storage"
,
dnodeInitStorage
,
dnodeCleanupStorage
);
taosStepAdd
(
steps
,
"dnode-dir"
,
dnodeInitDir
,
dnodeCleanupDir
);
//taosStepAdd(tsSteps, "dnode-tfs", tfInit, tfCleanup);
taosStepAdd
(
steps
,
"dnode-check"
,
dnodeInitCheck
,
dnodeCleanupCheck
);
taosStepAdd
(
tsSteps
,
"dnode-rpc"
,
rpcInit
,
rpcCleanup
);
taosStepAdd
(
steps
,
"dnode-rpc"
,
rpcInit
,
rpcCleanup
);
taosStepAdd
(
tsSteps
,
"dnode-check"
,
dnodeInitCheck
,
dnodeCleanupCheck
);
taosStepAdd
(
steps
,
"dnode-tfs"
,
dnodeInitTfs
,
NULL
);
taosStepAdd
(
tsSteps
,
"dnode-cfg"
,
dnodeInitCfg
,
dnodeCleanupCfg
);
taosStepAdd
(
steps
,
"dnode-wal"
,
walInit
,
walCleanUp
);
taosStepAdd
(
tsSteps
,
"dnode-deps"
,
dnodeInitDnodeEps
,
dnodeCleanupDnodeEps
);
taosStepAdd
(
steps
,
"dnode-sync"
,
syncInit
,
syncCleanUp
);
taosStepAdd
(
tsSteps
,
"dnode-meps"
,
dnodeInitMnodeEps
,
dnodeCleanupMnodeEps
);
taosStepAdd
(
steps
,
"dnode-eps"
,
dnodeInitEps
,
dnodeCleanupEps
);
//taosStepAdd(tsSteps, "dnode-wal", walInit, walCleanUp);
taosStepAdd
(
steps
,
"dnode-vnode"
,
dnodeInitVnode
,
vnodeCleanup
);
//taosStepAdd(tsSteps, "dnode-sync", syncInit, syncCleanUp);
taosStepAdd
(
steps
,
"dnode-mnode"
,
dnodeInitMnode
,
mnodeCleanup
);
taosStepAdd
(
tsSteps
,
"dnode-vnode"
,
dnodeInitVnodeModule
,
vnodeCleanup
);
taosStepAdd
(
steps
,
"dnode-trans"
,
dnodeInitTrans
,
dnodeCleanupTrans
);
taosStepAdd
(
tsSteps
,
"dnode-mnode"
,
dnodeInitMnodeModule
,
mnodeCleanup
);
taosStepAdd
(
steps
,
"dnode-msg"
,
dnodeInitMsg
,
dnodeCleanupMsg
);
taosStepAdd
(
tsSteps
,
"dnode-trans"
,
dnodeInitTrans
,
dnodeCleanupTrans
);
taosStepAdd
(
tsSteps
,
"dnode-status"
,
dnodeInitStatus
,
dnodeCleanupStatus
);
tsDnode
.
steps
=
steps
;
//taosStepAdd(tsSteps, "dnode-script",scriptEnvPoolInit, scriptEnvPoolCleanup);
taosStepExec
(
tsDnode
.
steps
);
taosStepExec
(
tsSteps
);
dnodeSetRunStat
(
DN_RUN_STAT_RUNNING
);
dnodeSetRunStat
(
DN_RUN_STAT_RUNNING
);
dnodeReportStartupFinished
(
"TDengine"
,
"initialized successfully"
);
dnodeReportStartupFinished
(
"TDengine"
,
"initialized successfully"
);
...
@@ -86,7 +195,7 @@ int32_t dnodeInit() {
...
@@ -86,7 +195,7 @@ int32_t dnodeInit() {
void
dnodeCleanup
()
{
void
dnodeCleanup
()
{
if
(
dnodeGetRunStat
()
!=
DN_RUN_STAT_STOPPED
)
{
if
(
dnodeGetRunStat
()
!=
DN_RUN_STAT_STOPPED
)
{
dnodeSetRunStat
(
DN_RUN_STAT_STOPPED
);
dnodeSetRunStat
(
DN_RUN_STAT_STOPPED
);
taosStepCleanup
(
ts
S
teps
);
taosStepCleanup
(
ts
Dnode
.
s
teps
);
ts
S
teps
=
NULL
;
ts
Dnode
.
s
teps
=
NULL
;
}
}
}
}
source/server/dnode/src/dnodeMain.c
已删除
100644 → 0
浏览文件 @
78001a28
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#if 0
#include "tfs.h"
#endif
#include "tnote.h"
#include "tcompression.h"
#include "ttimer.h"
#include "dnodeCfg.h"
#include "dnodeMain.h"
#include "mnode.h"
static
struct
{
EDnStat
runStatus
;
void
*
dnodeTimer
;
SStartupStep
startup
;
}
tsDmain
;
static
void
dnodeCheckDataDirOpenned
(
char
*
dir
)
{
#if 0
char filepath[256] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dir);
int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (fd < 0) {
dError("failed to open lock file:%s, reason: %s, quit", filepath, strerror(errno));
exit(0);
}
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret != 0) {
dError("failed to lock file:%s ret:%d since %s, database may be running, quit", filepath, ret, strerror(errno));
close(fd);
exit(0);
}
#endif
}
int32_t
dnodeInitMain
()
{
tsDmain
.
runStatus
=
DN_RUN_STAT_STOPPED
;
tsDmain
.
dnodeTimer
=
taosTmrInit
(
100
,
200
,
60000
,
"DND-TMR"
);
if
(
tsDmain
.
dnodeTimer
==
NULL
)
{
dError
(
"failed to init dnode timer"
);
return
-
1
;
}
tscEmbedded
=
1
;
taosIgnSIGPIPE
();
taosBlockSIGPIPE
();
taosResolveCRC
();
taosInitGlobalCfg
();
taosReadGlobalLogCfg
();
taosSetCoreDump
(
tsEnableCoreFile
);
if
(
!
taosMkDir
(
tsLogDir
))
{
printf
(
"failed to create dir: %s, reason: %s
\n
"
,
tsLogDir
,
strerror
(
errno
));
return
-
1
;
}
char
temp
[
TSDB_FILENAME_LEN
];
sprintf
(
temp
,
"%s/taosdlog"
,
tsLogDir
);
if
(
taosInitLog
(
temp
,
tsNumOfLogLines
,
1
)
<
0
)
{
printf
(
"failed to init log file
\n
"
);
}
if
(
!
taosReadGlobalCfg
())
{
taosPrintGlobalCfg
();
dError
(
"TDengine read global config failed"
);
return
-
1
;
}
dInfo
(
"start to initialize TDengine"
);
taosInitNotes
();
return
taosCheckGlobalCfg
();
}
void
dnodeCleanupMain
()
{
if
(
tsDmain
.
dnodeTimer
!=
NULL
)
{
taosTmrCleanUp
(
tsDmain
.
dnodeTimer
);
tsDmain
.
dnodeTimer
=
NULL
;
}
#if 0
taos_cleanup();
#endif
taosCloseLog
();
taosStopCacheRefreshWorker
();
}
int32_t
dnodeInitStorage
()
{
#ifdef TD_TSZ
// compress module init
tsCompressInit
();
#endif
// storage module init
if
(
tsDiskCfgNum
==
1
&&
!
taosMkDir
(
tsDataDir
))
{
dError
(
"failed to create dir:%s since %s"
,
tsDataDir
,
strerror
(
errno
));
return
-
1
;
}
#if 0
if (tfsInit(tsDiskCfg, tsDiskCfgNum) < 0) {
dError("failed to init TFS since %s", tstrerror(terrno));
return -1;
}
strncpy(tsDataDir, TFS_PRIMARY_PATH(), TSDB_FILENAME_LEN);
#endif
sprintf
(
tsMnodeDir
,
"%s/mnode"
,
tsDataDir
);
sprintf
(
tsVnodeDir
,
"%s/vnode"
,
tsDataDir
);
sprintf
(
tsDnodeDir
,
"%s/dnode"
,
tsDataDir
);
if
(
!
taosMkDir
(
tsMnodeDir
))
{
dError
(
"failed to create dir:%s since %s"
,
tsMnodeDir
,
strerror
(
errno
));
return
-
1
;
}
if
(
!
taosMkDir
(
tsDnodeDir
))
{
dError
(
"failed to create dir:%s since %s"
,
tsDnodeDir
,
strerror
(
errno
));
return
-
1
;
}
#if 0
if (tfsMkdir("vnode") < 0) {
dError("failed to create vnode dir since %s", tstrerror(terrno));
return -1;
}
if (tfsMkdir("vnode_bak") < 0) {
dError("failed to create vnode_bak dir since %s", tstrerror(terrno));
return -1;
}
TDIR *tdir = tfsOpendir("vnode_bak/.staging");
bool stagingNotEmpty = tfsReaddir(tdir) != NULL;
tfsClosedir(tdir);
if (stagingNotEmpty) {
dError("vnode_bak/.staging dir not empty, fix it first.");
return -1;
}
if (tfsMkdir("vnode_bak/.staging") < 0) {
dError("failed to create vnode_bak/.staging dir since %s", tstrerror(terrno));
return -1;
}
dnodeCheckDataDirOpenned(tsDnodeDir);
taosGetDisk();
dnodePrintDiskInfo();
#endif
dInfo
(
"dnode storage is initialized at %s"
,
tsDnodeDir
);
return
0
;
}
void
dnodeCleanupStorage
()
{
#if 0
// storage destroy
tfsDestroy();
#ifdef TD_TSZ
// compress destroy
tsCompressExit();
#endif
#endif
}
void
dnodeReportStartup
(
char
*
name
,
char
*
desc
)
{
SStartupStep
*
startup
=
&
tsDmain
.
startup
;
tstrncpy
(
startup
->
name
,
name
,
strlen
(
startup
->
name
));
tstrncpy
(
startup
->
desc
,
desc
,
strlen
(
startup
->
desc
));
startup
->
finished
=
0
;
}
void
dnodeReportStartupFinished
(
char
*
name
,
char
*
desc
)
{
SStartupStep
*
startup
=
&
tsDmain
.
startup
;
tstrncpy
(
startup
->
name
,
name
,
strlen
(
startup
->
name
));
tstrncpy
(
startup
->
desc
,
desc
,
strlen
(
startup
->
desc
));
startup
->
finished
=
1
;
}
void
dnodeProcessStartupReq
(
SRpcMsg
*
pMsg
)
{
dInfo
(
"startup msg is received, cont:%s"
,
(
char
*
)
pMsg
->
pCont
);
SStartupStep
*
pStep
=
rpcMallocCont
(
sizeof
(
SStartupStep
));
memcpy
(
pStep
,
&
tsDmain
.
startup
,
sizeof
(
SStartupStep
));
dDebug
(
"startup msg is sent, step:%s desc:%s finished:%d"
,
pStep
->
name
,
pStep
->
desc
,
pStep
->
finished
);
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
pStep
,
.
contLen
=
sizeof
(
SStartupStep
)};
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pMsg
->
pCont
);
}
static
int32_t
dnodeStartMnode
(
SRpcMsg
*
pMsg
)
{
SCreateMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
if
(
pCfg
->
dnodeId
!=
dnodeGetDnodeId
())
{
dDebug
(
"dnode:%d, in create meps msg is not equal with saved dnodeId:%d"
,
pCfg
->
dnodeId
,
dnodeGetDnodeId
());
return
TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED
;
}
if
(
strcmp
(
pCfg
->
dnodeEp
,
tsLocalEp
)
!=
0
)
{
dDebug
(
"dnodeEp:%s, in create meps msg is not equal with saved dnodeEp:%s"
,
pCfg
->
dnodeEp
,
tsLocalEp
);
return
TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED
;
}
dDebug
(
"dnode:%d, create meps msg is received from mnodes, numOfMnodes:%d"
,
pCfg
->
dnodeId
,
pCfg
->
mnodes
.
mnodeNum
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
mnodes
.
mnodeNum
;
++
i
)
{
pCfg
->
mnodes
.
mnodeInfos
[
i
].
mnodeId
=
htonl
(
pCfg
->
mnodes
.
mnodeInfos
[
i
].
mnodeId
);
dDebug
(
"meps index:%d, meps:%d:%s"
,
i
,
pCfg
->
mnodes
.
mnodeInfos
[
i
].
mnodeId
,
pCfg
->
mnodes
.
mnodeInfos
[
i
].
mnodeEp
);
}
if
(
mnodeGetStatus
()
==
MN_STATUS_READY
)
return
0
;
return
mnodeDeploy
();
}
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
dnodeStartMnode
(
pMsg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
)
{
SCfgDnodeMsg
*
pCfg
=
pMsg
->
pCont
;
int32_t
code
=
taosCfgDynamicOptions
(
pCfg
->
config
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
EDnStat
dnodeGetRunStat
()
{
return
tsDmain
.
runStatus
;
}
void
dnodeSetRunStat
(
EDnStat
stat
)
{
tsDmain
.
runStatus
=
stat
;
}
void
*
dnodeGetTimer
()
{
return
tsDmain
.
dnodeTimer
;
}
\ No newline at end of file
source/server/dnode/src/dnodeMnodeEps.c
已删除
100644 → 0
浏览文件 @
78001a28
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "dnodeCfg.h"
#include "dnodeDnodeEps.h"
#include "dnodeMnodeEps.h"
#include "mnode.h"
static
struct
{
SRpcEpSet
mnodeEpSet
;
SMInfos
mnodeInfos
;
char
file
[
PATH_MAX
+
20
];
pthread_mutex_t
mutex
;
}
tsDmeps
;
static
void
dnodePrintMnodeEps
()
{
SRpcEpSet
*
epset
=
&
tsDmeps
.
mnodeEpSet
;
dInfo
(
"print mnode eps, num:%d inuse:%d"
,
epset
->
numOfEps
,
epset
->
inUse
);
for
(
int32_t
i
=
0
;
i
<
epset
->
numOfEps
;
i
++
)
{
dInfo
(
"ep index:%d, %s:%u"
,
i
,
epset
->
fqdn
[
i
],
epset
->
port
[
i
]);
}
}
static
void
dnodeResetMnodeEps
(
SMInfos
*
mInfos
)
{
if
(
mInfos
==
NULL
||
mInfos
->
mnodeNum
==
0
)
{
tsDmeps
.
mnodeEpSet
.
numOfEps
=
1
;
taosGetFqdnPortFromEp
(
tsFirst
,
tsDmeps
.
mnodeEpSet
.
fqdn
[
0
],
&
tsDmeps
.
mnodeEpSet
.
port
[
0
]);
if
(
strcmp
(
tsSecond
,
tsFirst
)
!=
0
)
{
tsDmeps
.
mnodeEpSet
.
numOfEps
=
2
;
taosGetFqdnPortFromEp
(
tsSecond
,
tsDmeps
.
mnodeEpSet
.
fqdn
[
1
],
&
tsDmeps
.
mnodeEpSet
.
port
[
1
]);
}
dnodePrintMnodeEps
();
return
;
}
int32_t
size
=
sizeof
(
SMInfos
);
memcpy
(
&
tsDmeps
.
mnodeInfos
,
mInfos
,
size
);
tsDmeps
.
mnodeEpSet
.
inUse
=
tsDmeps
.
mnodeInfos
.
inUse
;
tsDmeps
.
mnodeEpSet
.
numOfEps
=
tsDmeps
.
mnodeInfos
.
mnodeNum
;
for
(
int32_t
i
=
0
;
i
<
tsDmeps
.
mnodeInfos
.
mnodeNum
;
i
++
)
{
taosGetFqdnPortFromEp
(
tsDmeps
.
mnodeInfos
.
mnodeInfos
[
i
].
mnodeEp
,
tsDmeps
.
mnodeEpSet
.
fqdn
[
i
],
&
tsDmeps
.
mnodeEpSet
.
port
[
i
]);
}
dnodePrintMnodeEps
();
}
static
int32_t
dnodeWriteMnodeEps
()
{
FILE
*
fp
=
fopen
(
tsDmeps
.
file
,
"w"
);
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
tsDmeps
.
file
,
strerror
(
errno
));
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
2000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
inUse
\"
: %d,
\n
"
,
tsDmeps
.
mnodeInfos
.
inUse
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
nodeNum
\"
: %d,
\n
"
,
tsDmeps
.
mnodeInfos
.
mnodeNum
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
nodeInfos
\"
: [{
\n
"
);
for
(
int32_t
i
=
0
;
i
<
tsDmeps
.
mnodeInfos
.
mnodeNum
;
i
++
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
nodeId
\"
: %d,
\n
"
,
tsDmeps
.
mnodeInfos
.
mnodeInfos
[
i
].
mnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
nodeEp
\"
:
\"
%s
\"\n
"
,
tsDmeps
.
mnodeInfos
.
mnodeInfos
[
i
].
mnodeEp
);
if
(
i
<
tsDmeps
.
mnodeInfos
.
mnodeNum
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }]
\n
"
);
}
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
dInfo
(
"successed to write %s"
,
tsDmeps
.
file
);
return
0
;
}
static
int32_t
dnodeReadMnodeEps
()
{
int32_t
len
=
0
;
int32_t
maxLen
=
2000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
SMInfos
mInfos
=
{
0
};
bool
nodeChanged
=
false
;
fp
=
fopen
(
tsDmeps
.
file
,
"r"
);
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
tsDmeps
.
file
);
goto
PARSE_MINFOS_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
tsDmeps
.
file
);
goto
PARSE_MINFOS_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
tsDmeps
.
file
);
goto
PARSE_MINFOS_OVER
;
}
cJSON
*
inUse
=
cJSON_GetObjectItem
(
root
,
"inUse"
);
if
(
!
inUse
||
inUse
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read mnodeEpSet.json since inUse not found"
);
goto
PARSE_MINFOS_OVER
;
}
tsDmeps
.
mnodeInfos
.
inUse
=
(
int8_t
)
inUse
->
valueint
;
cJSON
*
nodeNum
=
cJSON_GetObjectItem
(
root
,
"nodeNum"
);
if
(
!
nodeNum
||
nodeNum
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read mnodeEpSet.json since nodeNum not found"
);
goto
PARSE_MINFOS_OVER
;
}
mInfos
.
mnodeNum
=
(
int8_t
)
nodeNum
->
valueint
;
cJSON
*
nodeInfos
=
cJSON_GetObjectItem
(
root
,
"nodeInfos"
);
if
(
!
nodeInfos
||
nodeInfos
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read mnodeEpSet.json since nodeInfos not found"
);
goto
PARSE_MINFOS_OVER
;
}
int32_t
size
=
cJSON_GetArraySize
(
nodeInfos
);
if
(
size
!=
mInfos
.
mnodeNum
)
{
dError
(
"failed to read mnodeEpSet.json since nodeInfos size not matched"
);
goto
PARSE_MINFOS_OVER
;
}
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
cJSON
*
nodeInfo
=
cJSON_GetArrayItem
(
nodeInfos
,
i
);
if
(
nodeInfo
==
NULL
)
continue
;
cJSON
*
nodeId
=
cJSON_GetObjectItem
(
nodeInfo
,
"nodeId"
);
if
(
!
nodeId
||
nodeId
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read mnodeEpSet.json since nodeId not found"
);
goto
PARSE_MINFOS_OVER
;
}
cJSON
*
nodeEp
=
cJSON_GetObjectItem
(
nodeInfo
,
"nodeEp"
);
if
(
!
nodeEp
||
nodeEp
->
type
!=
cJSON_String
||
nodeEp
->
valuestring
==
NULL
)
{
dError
(
"failed to read mnodeEpSet.json since nodeName not found"
);
goto
PARSE_MINFOS_OVER
;
}
SMInfo
*
mInfo
=
&
mInfos
.
mnodeInfos
[
i
];
mInfo
->
mnodeId
=
(
int32_t
)
nodeId
->
valueint
;
tstrncpy
(
mInfo
->
mnodeEp
,
nodeEp
->
valuestring
,
TSDB_EP_LEN
);
bool
changed
=
dnodeIsDnodeEpChanged
(
mInfo
->
mnodeId
,
mInfo
->
mnodeEp
);
if
(
changed
)
nodeChanged
=
changed
;
}
dInfo
(
"successed to read file %s"
,
tsDmeps
.
file
);
PARSE_MINFOS_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
terrno
=
0
;
for
(
int32_t
i
=
0
;
i
<
mInfos
.
mnodeNum
;
++
i
)
{
SMInfo
*
mInfo
=
&
mInfos
.
mnodeInfos
[
i
];
dnodeGetDnodeEp
(
mInfo
->
mnodeId
,
mInfo
->
mnodeEp
,
NULL
,
NULL
);
}
dnodeResetMnodeEps
(
&
mInfos
);
if
(
nodeChanged
)
{
dnodeWriteMnodeEps
();
}
return
0
;
}
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
)
{
SRpcConnInfo
connInfo
=
{
0
};
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
);
SRpcEpSet
epSet
=
{
0
};
if
(
forShell
)
{
dnodeGetEpSetForShell
(
&
epSet
);
}
else
{
dnodeGetEpSetForPeer
(
&
epSet
);
}
dDebug
(
"msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d"
,
taosMsg
[
rpcMsg
->
msgType
],
taosIpStr
(
connInfo
.
clientIp
),
connInfo
.
user
,
epSet
.
numOfEps
,
epSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
dDebug
(
"mnode index:%d %s:%d"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
]);
if
(
strcmp
(
epSet
.
fqdn
[
i
],
tsLocalFqdn
)
==
0
)
{
if
((
epSet
.
port
[
i
]
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
&&
!
forShell
)
||
(
epSet
.
port
[
i
]
==
tsServerPort
&&
forShell
))
{
epSet
.
inUse
=
(
i
+
1
)
%
epSet
.
numOfEps
;
dDebug
(
"mnode index:%d %s:%d set inUse to %d"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
],
epSet
.
inUse
);
}
}
epSet
.
port
[
i
]
=
htons
(
epSet
.
port
[
i
]);
}
rpcSendRedirectRsp
(
rpcMsg
->
handle
,
&
epSet
);
}
int32_t
dnodeInitMnodeEps
()
{
snprintf
(
tsDmeps
.
file
,
sizeof
(
tsDmeps
.
file
),
"%s/mnodeEpSet.json"
,
tsDnodeDir
);
pthread_mutex_init
(
&
tsDmeps
.
mutex
,
NULL
);
dnodeResetMnodeEps
(
NULL
);
int32_t
ret
=
dnodeReadMnodeEps
();
if
(
ret
==
0
)
{
dInfo
(
"dnode mInfos is initialized"
);
}
return
ret
;
}
void
dnodeCleanupMnodeEps
()
{
pthread_mutex_destroy
(
&
tsDmeps
.
mutex
);
}
void
dnodeUpdateMnodeFromStatus
(
SMInfos
*
mInfos
)
{
if
(
mInfos
->
mnodeNum
<=
0
||
mInfos
->
mnodeNum
>
TSDB_MAX_REPLICA
)
{
dError
(
"invalid mInfos since num:%d invalid"
,
mInfos
->
mnodeNum
);
return
;
}
for
(
int32_t
i
=
0
;
i
<
mInfos
->
mnodeNum
;
++
i
)
{
SMInfo
*
minfo
=
&
mInfos
->
mnodeInfos
[
i
];
minfo
->
mnodeId
=
htonl
(
minfo
->
mnodeId
);
if
(
minfo
->
mnodeId
<=
0
||
strlen
(
minfo
->
mnodeEp
)
<=
5
)
{
dError
(
"invalid mInfo:%d since id:%d and ep:%s invalid"
,
i
,
minfo
->
mnodeId
,
minfo
->
mnodeEp
);
return
;
}
}
pthread_mutex_lock
(
&
tsDmeps
.
mutex
);
if
(
mInfos
->
mnodeNum
!=
tsDmeps
.
mnodeInfos
.
mnodeNum
)
{
dnodeResetMnodeEps
(
mInfos
);
dnodeWriteMnodeEps
();
}
else
{
int32_t
size
=
sizeof
(
SMInfos
);
if
(
memcmp
(
mInfos
,
&
tsDmeps
.
mnodeInfos
,
size
)
!=
0
)
{
dnodeResetMnodeEps
(
mInfos
);
dnodeWriteMnodeEps
();
}
}
pthread_mutex_unlock
(
&
tsDmeps
.
mutex
);
}
void
dnodeUpdateMnodeFromPeer
(
SRpcEpSet
*
ep
)
{
if
(
ep
->
numOfEps
<=
0
)
{
dError
(
"mInfos is changed, but content is invalid, discard it"
);
return
;
}
pthread_mutex_lock
(
&
tsDmeps
.
mutex
);
dInfo
(
"mInfos is changed, numOfEps:%d inUse:%d"
,
ep
->
numOfEps
,
ep
->
inUse
);
for
(
int32_t
i
=
0
;
i
<
ep
->
numOfEps
;
++
i
)
{
ep
->
port
[
i
]
-=
TSDB_PORT_DNODEDNODE
;
dInfo
(
"minfo:%d %s:%u"
,
i
,
ep
->
fqdn
[
i
],
ep
->
port
[
i
]);
}
tsDmeps
.
mnodeEpSet
=
*
ep
;
pthread_mutex_unlock
(
&
tsDmeps
.
mutex
);
}
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsDmeps
.
mutex
);
*
epSet
=
tsDmeps
.
mnodeEpSet
;
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
epSet
->
port
[
i
]
+=
TSDB_PORT_DNODEDNODE
;
}
pthread_mutex_unlock
(
&
tsDmeps
.
mutex
);
}
void
dnodeGetEpSetForShell
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsDmeps
.
mutex
);
*
epSet
=
tsDmeps
.
mnodeEpSet
;
pthread_mutex_unlock
(
&
tsDmeps
.
mutex
);
}
source/server/dnode/src/dnode
Status
.c
→
source/server/dnode/src/dnode
Msg
.c
浏览文件 @
73b7bef1
...
@@ -14,67 +14,44 @@
...
@@ -14,67 +14,44 @@
*/
*/
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "tthread.h"
#include "ttime.h"
#include "dnodeEps.h"
#include "ttimer.h"
#include "dnodeMsg.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnodeCfg.h"
#include "dnodeDnodeEps.h"
#include "dnodeMnodeEps.h"
#include "dnodeStatus.h"
#include "dnodeMain.h"
#include "vnode.h"
#include "vnode.h"
#include "ttime.h"
static
struct
{
static
struct
{
void
*
dnodeTimer
;
pthread_t
*
threadId
;
void
*
statusTimer
;
bool
stop
;
uint32_t
rebootTime
;
uint32_t
rebootTime
;
}
tsStatus
;
}
tsMsg
;
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
)
{
if
(
tsStatus
.
statusTimer
==
NULL
)
{
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsStatus
.
dnodeTimer
,
&
tsStatus
.
statusTimer
);
dError
(
"failed to start status timer"
);
return
;
}
static
void
dnodeSendStatusMsg
()
{
int32_t
contLen
=
sizeof
(
SStatusMsg
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeLoad
);
int32_t
contLen
=
sizeof
(
SStatusMsg
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeLoad
);
SStatusMsg
*
pStatus
=
rpcMallocCont
(
contLen
);
SStatusMsg
*
pStatus
=
rpcMallocCont
(
contLen
);
if
(
pStatus
==
NULL
)
{
if
(
pStatus
==
NULL
)
{
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsStatus
.
dnodeTimer
,
&
tsStatus
.
statusTimer
);
dError
(
"failed to malloc status message"
);
dError
(
"failed to malloc status message"
);
return
;
return
;
}
}
dnodeGetCfg
(
&
pStatus
->
dnodeId
,
pStatus
->
clusterId
);
pStatus
->
dnodeId
=
htonl
(
dnodeGetDnodeId
());
pStatus
->
version
=
htonl
(
tsVersion
);
pStatus
->
version
=
htonl
(
tsVersion
);
pStatus
->
lastReboot
=
htonl
(
tsStatus
.
rebootTime
);
pStatus
->
dnodeId
=
htonl
(
dnodeGetDnodeId
());
pStatus
->
numOfCores
=
htons
((
uint16_t
)
tsNumOfCores
);
pStatus
->
diskAvailable
=
tsAvailDataDirGB
;
pStatus
->
alternativeRole
=
tsAlternativeRole
;
tstrncpy
(
pStatus
->
dnodeEp
,
tsLocalEp
,
TSDB_EP_LEN
);
tstrncpy
(
pStatus
->
dnodeEp
,
tsLocalEp
,
TSDB_EP_LEN
);
pStatus
->
clusterId
=
htobe64
(
dnodeGetClusterId
());
pStatus
->
lastReboot
=
htonl
(
tsMsg
.
rebootTime
);
pStatus
->
numOfCores
=
htonl
(
tsNumOfCores
);
pStatus
->
diskAvailable
=
tsAvailDataDirGB
;
// fill cluster cfg parameters
// fill cluster cfg parameters
pStatus
->
clusterCfg
.
numOfMnodes
=
htonl
(
tsNumOfMnodes
);
pStatus
->
clusterCfg
.
mnodeEqualVnodeNum
=
htonl
(
tsMnodeEqualVnodeNum
);
pStatus
->
clusterCfg
.
offlineThreshold
=
htonl
(
tsOfflineThreshold
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
tsStatusInterval
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
tsStatusInterval
);
pStatus
->
clusterCfg
.
maxtablesPerVnode
=
htonl
(
tsMaxTablePerVnode
);
pStatus
->
clusterCfg
.
maxVgroupsPerDb
=
htonl
(
tsMaxVgroupsPerDb
);
tstrncpy
(
pStatus
->
clusterCfg
.
arbitrator
,
tsArbitrator
,
TSDB_EP_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
timezone
,
tsTimezone
,
64
);
pStatus
->
clusterCfg
.
checkTime
=
0
;
pStatus
->
clusterCfg
.
checkTime
=
0
;
tstrncpy
(
pStatus
->
clusterCfg
.
timezone
,
tsTimezone
,
64
);
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
pStatus
->
clusterCfg
.
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
(
void
)
taosParseTime
(
timestr
,
&
pStatus
->
clusterCfg
.
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
tstrncpy
(
pStatus
->
clusterCfg
.
locale
,
tsLocale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
locale
,
tsLocale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
tsCharset
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
tsCharset
,
TSDB_LOCALE_LEN
);
pStatus
->
clusterCfg
.
enableBalance
=
tsEnableBalance
;
pStatus
->
clusterCfg
.
flowCtrl
=
tsEnableFlowCtrl
;
pStatus
->
clusterCfg
.
slaveQuery
=
tsEnableSlaveQuery
;
pStatus
->
clusterCfg
.
adjustMaster
=
tsEnableAdjustMaster
;
vnodeGetStatus
(
pStatus
);
vnodeGetStatus
(
pStatus
);
contLen
=
sizeof
(
SStatusMsg
)
+
pStatus
->
openVnodes
*
sizeof
(
SVnodeLoad
);
contLen
=
sizeof
(
SStatusMsg
)
+
pStatus
->
openVnodes
*
sizeof
(
SVnodeLoad
);
pStatus
->
openVnodes
=
htons
(
pStatus
->
openVnodes
);
pStatus
->
openVnodes
=
htons
(
pStatus
->
openVnodes
);
...
@@ -85,52 +62,113 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
...
@@ -85,52 +62,113 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
}
}
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
)
{
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
)
{
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
dTrace
(
"status rsp is received, code:%s"
,
tstrerror
(
pMsg
->
code
));
dError
(
"status rsp is received, error:%s"
,
tstrerror
(
pMsg
->
code
));
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
return
;
if
(
pMsg
->
code
==
TSDB_CODE_MND_DNODE_NOT_EXIST
)
{
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
dnodeGetClusterId
(
clusterId
);
if
(
clusterId
[
0
]
!=
'\0'
)
{
dnodeSetDropped
();
dError
(
"exit zombie dropped dnode"
);
exit
(
EXIT_FAILURE
);
}
}
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsStatus
.
dnodeTimer
,
&
tsStatus
.
statusTimer
);
return
;
}
SStatusRsp
*
pStatusRsp
=
pMsg
->
pCont
;
SStatusRsp
*
pStatusRsp
=
pMsg
->
pCont
;
SMInfos
*
minfos
=
&
pStatusRsp
->
mnodes
;
dnodeUpdateMnodeFromStatus
(
minfos
);
SDnodeCfg
*
pCfg
=
&
pStatusRsp
->
dnodeCfg
;
SDnodeCfg
*
pCfg
=
&
pStatusRsp
->
dnodeCfg
;
pCfg
->
numOfVnodes
=
htonl
(
pCfg
->
numOfVnodes
);
pCfg
->
moduleStatus
=
htonl
(
pCfg
->
moduleStatus
);
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
clusterId
=
htobe64
(
pCfg
->
clusterId
);
pCfg
->
numOfVnodes
=
htonl
(
pCfg
->
numOfVnodes
);
pCfg
->
numOfDnodes
=
htonl
(
pCfg
->
numOfDnodes
);
dnodeUpdateCfg
(
pCfg
);
dnodeUpdateCfg
(
pCfg
);
if
(
pCfg
->
dropped
)
{
dError
(
"status rsp is received, and set dnode to drop status"
);
return
;
}
vnodeSetAccess
(
pStatusRsp
->
vgAccess
,
pCfg
->
numOfVnodes
);
vnodeSetAccess
(
pStatusRsp
->
vgAccess
,
pCfg
->
numOfVnodes
);
SDnodeEps
*
pEps
=
(
SDnodeEps
*
)((
char
*
)
pStatusRsp
->
vgAccess
+
pCfg
->
numOfVnodes
*
sizeof
(
SVgroupAccess
));
SDnodeEps
*
eps
=
(
SDnodeEps
*
)((
char
*
)
pStatusRsp
->
vgAccess
+
pCfg
->
numOfVnodes
*
sizeof
(
SVgroupAccess
));
dnodeUpdateDnodeEps
(
pEps
);
eps
->
dnodeNum
=
htonl
(
eps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
eps
->
dnodeNum
;
++
i
)
{
eps
->
dnodeEps
[
i
].
dnodeId
=
htonl
(
eps
->
dnodeEps
[
i
].
dnodeId
);
eps
->
dnodeEps
[
i
].
dnodePort
=
htons
(
eps
->
dnodeEps
[
i
].
dnodePort
);
}
dnodeUpdateDnodeEps
(
eps
);
}
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsStatus
.
dnodeTimer
,
&
tsStatus
.
statusTimer
);
static
void
*
dnodeThreadRoutine
(
void
*
param
)
{
int32_t
ms
=
tsStatusInterval
*
1000
;
while
(
!
tsMsg
.
stop
)
{
taosMsleep
(
ms
);
dnodeSendStatusMsg
();
}
}
}
int32_t
dnodeInitStatus
()
{
int32_t
dnodeInitMsg
()
{
tsStatus
.
statusTimer
=
NULL
;
tsMsg
.
stop
=
false
;
tsStatus
.
dnodeTimer
=
dnodeGetTimer
();
tsMsg
.
rebootTime
=
taosGetTimestampSec
();
tsStatus
.
rebootTime
=
taosGetTimestampSec
();
tsMsg
.
threadId
=
taosCreateThread
(
dnodeThreadRoutine
,
NULL
);
taosTmrReset
(
dnodeSendStatusMsg
,
500
,
NULL
,
tsStatus
.
dnodeTimer
,
&
tsStatus
.
statusTimer
);
if
(
tsMsg
.
threadId
==
NULL
)
{
dInfo
(
"dnode status timer is initialized"
);
return
-
1
;
return
TSDB_CODE_SUCCESS
;
}
dInfo
(
"dnode msg is initialized"
);
return
0
;
}
}
void
dnodeCleanupStatus
()
{
void
dnodeCleanupMsg
()
{
if
(
tsStatus
.
statusTimer
!=
NULL
)
{
if
(
tsMsg
.
threadId
!=
NULL
)
{
taosTmrStopA
(
&
tsStatus
.
statusTimer
);
tsMsg
.
stop
=
true
;
tsStatus
.
statusTimer
=
NULL
;
taosDestoryThread
(
tsMsg
.
threadId
);
tsMsg
.
threadId
=
NULL
;
}
}
dInfo
(
"dnode msg is cleanuped"
);
}
static
int32_t
dnodeStartMnode
(
SRpcMsg
*
pMsg
)
{
SCreateMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
mnodeNum
=
htonl
(
pCfg
->
mnodeNum
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
mnodeNum
;
++
i
)
{
pCfg
->
mnodeEps
[
i
].
dnodeId
=
htonl
(
pCfg
->
mnodeEps
[
i
].
dnodeId
);
pCfg
->
mnodeEps
[
i
].
dnodePort
=
htons
(
pCfg
->
mnodeEps
[
i
].
dnodePort
);
}
if
(
pCfg
->
dnodeId
!=
dnodeGetDnodeId
())
{
dDebug
(
"dnode:%d, in create meps msg is not equal with saved dnodeId:%d"
,
pCfg
->
dnodeId
,
dnodeGetDnodeId
());
return
TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED
;
}
if
(
mnodeGetStatus
()
==
MN_STATUS_READY
)
return
0
;
return
mnodeDeploy
();
}
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
dnodeStartMnode
(
pMsg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
)
{
SCfgDnodeMsg
*
pCfg
=
pMsg
->
pCont
;
int32_t
code
=
taosCfgDynamicOptions
(
pCfg
->
config
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
void
dnodeProcessStartupReq
(
SRpcMsg
*
pMsg
)
{
dInfo
(
"startup msg is received, cont:%s"
,
(
char
*
)
pMsg
->
pCont
);
SStartupStep
*
pStep
=
rpcMallocCont
(
sizeof
(
SStartupStep
));
dnodeGetStartup
(
pStep
);
dDebug
(
"startup msg is sent, step:%s desc:%s finished:%d"
,
pStep
->
name
,
pStep
->
desc
,
pStep
->
finished
);
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
pStep
,
.
contLen
=
sizeof
(
SStartupStep
)};
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pMsg
->
pCont
);
}
}
source/server/dnode/src/dnodeTrans.c
浏览文件 @
73b7bef1
...
@@ -21,9 +21,8 @@
...
@@ -21,9 +21,8 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "dnodeTrans.h"
#include "dnodeTrans.h"
#include "dnodeMain.h"
#include "dnodeEps.h"
#include "dnodeMnodeEps.h"
#include "dnodeMsg.h"
#include "dnodeStatus.h"
#include "mnode.h"
#include "mnode.h"
#include "vnode.h"
#include "vnode.h"
...
@@ -97,8 +96,8 @@ static int32_t dnodeInitServer() {
...
@@ -97,8 +96,8 @@ static int32_t dnodeInitServer() {
tsTrans
.
peerMsgFp
[
TSDB_MSG_TYPE_DM_GRANT
]
=
mnodeProcessMsg
;
tsTrans
.
peerMsgFp
[
TSDB_MSG_TYPE_DM_GRANT
]
=
mnodeProcessMsg
;
tsTrans
.
peerMsgFp
[
TSDB_MSG_TYPE_DM_STATUS
]
=
mnodeProcessMsg
;
tsTrans
.
peerMsgFp
[
TSDB_MSG_TYPE_DM_STATUS
]
=
mnodeProcessMsg
;
tsTrans
.
peerMsgFp
[
TSDB_MSG_TYPE_MQ_CONNECT
]
=
vnodeProcessMsg
;
tsTrans
.
peerMsgFp
[
TSDB_MSG_TYPE_MQ_CONNECT
]
=
vnodeProcessMsg
;
/*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/
tsTrans
.
peerMsgFp
[
TSDB_MSG_TYPE_MQ_CONSUME
]
=
vnodeProcessMsg
;
SRpcInit
rpcInit
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
...
@@ -139,7 +138,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
...
@@ -139,7 +138,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
}
if
(
msgType
==
TSDB_MSG_TYPE_DM_STATUS_RSP
&&
pEpSet
)
{
if
(
msgType
==
TSDB_MSG_TYPE_DM_STATUS_RSP
&&
pEpSet
)
{
dnodeUpdateMnode
FromPeer
(
pEpSet
);
dnodeUpdateMnode
Eps
(
pEpSet
);
}
}
RpcMsgFp
fp
=
tsTrans
.
peerMsgFp
[
msgType
];
RpcMsgFp
fp
=
tsTrans
.
peerMsgFp
[
msgType
];
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录