Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
da0a115c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
da0a115c
编写于
11月 01, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
rename files
上级
7354b9e1
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
493 addition
and
763 deletion
+493
-763
source/dnode/mgmt/inc/dnodeCheck.h
source/dnode/mgmt/inc/dnodeCheck.h
+0
-31
source/dnode/mgmt/inc/dnodeConfig.h
source/dnode/mgmt/inc/dnodeConfig.h
+0
-41
source/dnode/mgmt/inc/dnodeDnode.h
source/dnode/mgmt/inc/dnodeDnode.h
+14
-0
source/dnode/mgmt/src/dnodeCheck.c
source/dnode/mgmt/src/dnodeCheck.c
+0
-191
source/dnode/mgmt/src/dnodeConfig.c
source/dnode/mgmt/src/dnodeConfig.c
+0
-415
source/dnode/mgmt/src/dnodeDnode.c
source/dnode/mgmt/src/dnodeDnode.c
+403
-35
source/dnode/mgmt/src/dnodeInt.c
source/dnode/mgmt/src/dnodeInt.c
+46
-47
source/dnode/mgmt/src/dnodeMnode.c
source/dnode/mgmt/src/dnodeMnode.c
+30
-2
source/dnode/mgmt/src/dnodeTransport.c
source/dnode/mgmt/src/dnodeTransport.c
+0
-1
未找到文件。
source/dnode/mgmt/inc/dnodeCheck.h
已删除
100644 → 0
浏览文件 @
7354b9e1
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_CHECK_H_
#define _TD_DNODE_CHECK_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "dnodeInt.h"
int32_t
dnodeInitCheck
();
void
dnodeCleanupCheck
();
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DNODE_CHECK_H_*/
source/dnode/mgmt/inc/dnodeConfig.h
已删除
100644 → 0
浏览文件 @
7354b9e1
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_CONFIG_H_
#define _TD_DNODE_CONFIG_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "dnodeInt.h"
int32_t
dnodeInitConfig
();
void
dnodeCleanupConfig
();
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
);
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
);
void
dnodeUpdateMnodeEps
(
SRpcEpSet
*
pEpSet
);
int32_t
dnodeGetDnodeId
();
int64_t
dnodeGetClusterId
();
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
);
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
);
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DNODE_CONFIG_H_*/
\ No newline at end of file
source/dnode/mgmt/inc/dnodeDnode.h
浏览文件 @
da0a115c
...
...
@@ -27,6 +27,20 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg);
void
dnodeProcessStartupReq
(
SRpcMsg
*
pMsg
);
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
);
int32_t
dnodeInitConfig
();
void
dnodeCleanupConfig
();
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
);
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
);
void
dnodeUpdateMnodeEps
(
SRpcEpSet
*
pEpSet
);
int32_t
dnodeGetDnodeId
();
int64_t
dnodeGetClusterId
();
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
);
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
);
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mgmt/src/dnodeCheck.c
已删除
100644 → 0
浏览文件 @
7354b9e1
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeCheck.h"
#define MIN_AVAIL_MEMORY_MB 32
static
int32_t
dnodeBindTcpPort
(
uint16_t
port
)
{
#if 0
SOCKET serverSocket;
struct sockaddr_in server_addr;
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
dError("failed to create tcp socket since %s", strerror(errno));
return -1;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
dError("failed to bind tcp port:%d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return -1;
}
if (listen(serverSocket, 5) < 0) {
dError("failed to listen tcp port:%d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return -1;
}
taosCloseSocket(serverSocket);
#endif
return
0
;
}
static
int32_t
dnodeBindUdpPort
(
int16_t
port
)
{
#if 0
SOCKET serverSocket;
struct sockaddr_in server_addr;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
dError("failed to create udp socket since %s", strerror(errno));
return -1;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
dError("failed to bind udp port:%d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return -1;
}
taosCloseSocket(serverSocket);
#endif
return
0
;
}
static
int32_t
dnodeCheckNetwork
()
{
int32_t
ret
;
uint16_t
startPort
=
tsServerPort
;
for
(
uint16_t
port
=
startPort
;
port
<
startPort
+
12
;
port
++
)
{
ret
=
dnodeBindTcpPort
(
port
);
if
(
0
!=
ret
)
{
dError
(
"failed to bind tcp port:%d"
,
port
);
return
-
1
;
}
ret
=
dnodeBindUdpPort
(
port
);
if
(
0
!=
ret
)
{
dError
(
"failed to bind udp port:%d"
,
port
);
return
-
1
;
}
}
return
0
;
}
static
int32_t
dnodeCheckMem
()
{
#if 0
float memoryUsedMB;
float memoryAvailMB;
if (true != taosGetSysMemory(&memoryUsedMB)) {
dError("failed to get system memory since %s, errno:%u,", strerror(errno), errno);
return -1;
}
memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB;
if (memoryAvailMB < MIN_AVAIL_MEMORY_MB) {
dError("available memory %fMB less than the threshold %dMB", memoryAvailMB, MIN_AVAIL_MEMORY_MB);
return -1;
}
#endif
return
0
;
}
static
int32_t
dnodeCheckDisk
()
{
#if 0
taosGetDisk();
if (tsAvailDataDirGB < tsMinimalDataDirGB) {
dError("dataDir disk size:%fGB less than threshold %fGB ", tsAvailDataDirGB, tsMinimalDataDirGB);
return -1;
}
if (tsAvailLogDirGB < tsMinimalLogDirGB) {
dError("logDir disk size:%fGB less than threshold %fGB", tsAvailLogDirGB, tsMinimalLogDirGB);
return -1;
}
if (tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
dError("tmpDir disk size:%fGB less than threshold %fGB", tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
return -1;
}
#endif
return
0
;
}
static
int32_t
dnodeCheckCpu
()
{
return
0
;
}
static
int32_t
dnodeCheckOs
()
{
return
0
;
}
static
int32_t
dnodeCheckAccess
()
{
return
0
;
}
static
int32_t
dnodeCheckVersion
()
{
return
0
;
}
static
int32_t
dnodeCheckDatafile
()
{
return
0
;
}
int32_t
dnodeInitCheck
()
{
if
(
dnodeCheckNetwork
()
!=
0
)
{
dError
(
"failed to check network"
);
return
-
1
;
}
if
(
dnodeCheckMem
()
!=
0
)
{
dError
(
"failed to check memory"
);
return
-
1
;
}
if
(
dnodeCheckCpu
()
!=
0
)
{
dError
(
"failed to check cpu"
);
return
-
1
;
}
if
(
dnodeCheckDisk
()
!=
0
)
{
dError
(
"failed to check disk"
);
return
-
1
;
}
if
(
dnodeCheckOs
()
!=
0
)
{
dError
(
"failed to check os"
);
return
-
1
;
}
if
(
dnodeCheckAccess
()
!=
0
)
{
dError
(
"failed to check access"
);
return
-
1
;
}
if
(
dnodeCheckVersion
()
!=
0
)
{
dError
(
"failed to check version"
);
return
-
1
;
}
if
(
dnodeCheckDatafile
()
!=
0
)
{
dError
(
"failed to check datafile"
);
return
-
1
;
}
dInfo
(
"dnode check is finished"
);
return
0
;
}
void
dnodeCleanupCheck
()
{}
\ No newline at end of file
source/dnode/mgmt/src/dnodeConfig.c
已删除
100644 → 0
浏览文件 @
7354b9e1
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeConfig.h"
#include "cJSON.h"
#include "thash.h"
static
struct
{
int32_t
dnodeId
;
int32_t
dropped
;
int64_t
clusterId
;
SDnodeEps
*
dnodeEps
;
SHashObj
*
dnodeHash
;
SRpcEpSet
mnodeEpSetForShell
;
SRpcEpSet
mnodeEpSetForPeer
;
char
file
[
PATH_MAX
+
20
];
pthread_mutex_t
mutex
;
}
tsConfig
;
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
*
epSet
=
tsConfig
.
mnodeEpSetForPeer
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
static
void
dnodeGetEpSetForShell
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
*
epSet
=
tsConfig
.
mnodeEpSetForShell
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeUpdateMnodeEps
(
SRpcEpSet
*
ep
)
{
if
(
ep
!=
NULL
||
ep
->
numOfEps
<=
0
)
{
dError
(
"mnode is changed, but content is invalid, discard it"
);
return
;
}
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
dInfo
(
"mnode is changed, num:%d use:%d"
,
ep
->
numOfEps
,
ep
->
inUse
);
tsConfig
.
mnodeEpSetForPeer
=
*
ep
;
for
(
int32_t
i
=
0
;
i
<
ep
->
numOfEps
;
++
i
)
{
ep
->
port
[
i
]
-=
TSDB_PORT_DNODEDNODE
;
dInfo
(
"mnode index:%d %s:%u"
,
i
,
ep
->
fqdn
[
i
],
ep
->
port
[
i
]);
}
tsConfig
.
mnodeEpSetForShell
=
*
ep
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
)
{
SRpcConnInfo
connInfo
=
{
0
};
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
);
SRpcEpSet
epSet
=
{
0
};
if
(
forShell
)
{
dnodeGetEpSetForShell
(
&
epSet
);
}
else
{
dnodeGetEpSetForPeer
(
&
epSet
);
}
dDebug
(
"msg:%s will be redirected, num:%d use:%d"
,
taosMsg
[
rpcMsg
->
msgType
],
epSet
.
numOfEps
,
epSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
dDebug
(
"mnode index:%d %s:%d"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
]);
if
(
strcmp
(
epSet
.
fqdn
[
i
],
tsLocalFqdn
)
==
0
)
{
if
((
epSet
.
port
[
i
]
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
&&
!
forShell
)
||
(
epSet
.
port
[
i
]
==
tsServerPort
&&
forShell
))
{
epSet
.
inUse
=
(
i
+
1
)
%
epSet
.
numOfEps
;
dDebug
(
"mnode index:%d %s:%d set inUse to %d"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
],
epSet
.
inUse
);
}
}
epSet
.
port
[
i
]
=
htons
(
epSet
.
port
[
i
]);
}
rpcSendRedirectRsp
(
rpcMsg
->
handle
,
&
epSet
);
}
static
void
dnodePrintEps
()
{
dDebug
(
"print dnode list, num:%d"
,
tsConfig
.
dnodeEps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
dDebug
(
"dnode:%d, fqdn:%s port:%u isMnode:%d"
,
ep
->
dnodeId
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
,
ep
->
isMnode
);
}
}
static
void
dnodeResetEps
(
SDnodeEps
*
data
)
{
assert
(
data
!=
NULL
);
int32_t
size
=
sizeof
(
SDnodeEps
)
+
data
->
dnodeNum
*
sizeof
(
SDnodeEp
);
if
(
data
->
dnodeNum
>
tsConfig
.
dnodeEps
->
dnodeNum
)
{
SDnodeEps
*
tmp
=
calloc
(
1
,
size
);
if
(
tmp
==
NULL
)
return
;
tfree
(
tsConfig
.
dnodeEps
);
tsConfig
.
dnodeEps
=
tmp
;
}
if
(
tsConfig
.
dnodeEps
!=
data
)
{
memcpy
(
tsConfig
.
dnodeEps
,
data
,
size
);
}
tsConfig
.
mnodeEpSetForPeer
.
inUse
=
0
;
tsConfig
.
mnodeEpSetForShell
.
inUse
=
0
;
int32_t
index
=
0
;
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
if
(
!
ep
->
isMnode
)
continue
;
if
(
index
>=
TSDB_MAX_REPLICA
)
continue
;
strcpy
(
tsConfig
.
mnodeEpSetForShell
.
fqdn
[
index
],
ep
->
dnodeFqdn
);
strcpy
(
tsConfig
.
mnodeEpSetForPeer
.
fqdn
[
index
],
ep
->
dnodeFqdn
);
tsConfig
.
mnodeEpSetForShell
.
port
[
index
]
=
ep
->
dnodePort
;
tsConfig
.
mnodeEpSetForShell
.
port
[
index
]
=
ep
->
dnodePort
+
tsDnodeDnodePort
;
index
++
;
}
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
taosHashPut
(
tsConfig
.
dnodeHash
,
&
ep
->
dnodeId
,
sizeof
(
int32_t
),
ep
,
sizeof
(
SDnodeEp
));
}
dnodePrintEps
();
}
static
bool
dnodeIsDnodeEpChanged
(
int32_t
dnodeId
,
char
*
epstr
)
{
bool
changed
=
false
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
SDnodeEp
*
ep
=
taosHashGet
(
tsConfig
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
ep
!=
NULL
)
{
char
epSaved
[
TSDB_EP_LEN
+
1
];
snprintf
(
epSaved
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
changed
=
strcmp
(
epstr
,
epSaved
)
!=
0
;
tstrncpy
(
epstr
,
epSaved
,
TSDB_EP_LEN
);
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
return
changed
;
}
static
int32_t
dnodeReadEps
()
{
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
fp
=
fopen
(
tsConfig
.
file
,
"r"
);
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
root
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dnodeId not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
tsConfig
.
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dropped not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
tsConfig
.
dropped
=
atoi
(
dropped
->
valuestring
);
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterId"
);
if
(
!
clusterId
||
clusterId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since clusterId not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
tsConfig
.
clusterId
=
atoll
(
clusterId
->
valuestring
);
cJSON
*
dnodeInfos
=
cJSON_GetObjectItem
(
root
,
"dnodeInfos"
);
if
(
!
dnodeInfos
||
dnodeInfos
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since dnodeInfos not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
int32_t
dnodeInfosSize
=
cJSON_GetArraySize
(
dnodeInfos
);
if
(
dnodeInfosSize
<=
0
)
{
dError
(
"failed to read %s since dnodeInfos size:%d invalid"
,
tsConfig
.
file
,
dnodeInfosSize
);
goto
PRASE_EPS_OVER
;
}
tsConfig
.
dnodeEps
=
calloc
(
1
,
dnodeInfosSize
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
));
if
(
tsConfig
.
dnodeEps
==
NULL
)
{
dError
(
"failed to calloc dnodeEpList since %s"
,
strerror
(
errno
));
goto
PRASE_EPS_OVER
;
}
tsConfig
.
dnodeEps
->
dnodeNum
=
dnodeInfosSize
;
for
(
int32_t
i
=
0
;
i
<
dnodeInfosSize
;
++
i
)
{
cJSON
*
dnodeInfo
=
cJSON_GetArrayItem
(
dnodeInfos
,
i
);
if
(
dnodeInfo
==
NULL
)
break
;
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodeId not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
ep
->
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
isMnode
=
cJSON_GetObjectItem
(
dnodeInfo
,
"isMnode"
);
if
(
!
isMnode
||
isMnode
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, isMnode not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
ep
->
isMnode
=
atoi
(
isMnode
->
valuestring
);
cJSON
*
dnodeFqdn
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeFqdn"
);
if
(
!
dnodeFqdn
||
dnodeFqdn
->
type
!=
cJSON_String
||
dnodeFqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s, dnodeFqdn not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
tstrncpy
(
ep
->
dnodeFqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
dnodePort
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodePort"
);
if
(
!
dnodePort
||
dnodePort
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodePort not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
ep
->
dnodePort
=
atoi
(
dnodePort
->
valuestring
);
}
dInfo
(
"succcessed to read file %s"
,
tsConfig
.
file
);
dnodePrintEps
();
PRASE_EPS_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
if
(
dnodeIsDnodeEpChanged
(
tsConfig
.
dnodeId
,
tsLocalEp
))
{
dError
(
"dnode:%d, localEp %s different with dnodeEps.json and need reconfigured"
,
tsConfig
.
dnodeId
,
tsLocalEp
);
return
-
1
;
}
dnodeResetEps
(
tsConfig
.
dnodeEps
);
terrno
=
0
;
return
0
;
}
static
int32_t
dnodeWriteEps
()
{
FILE
*
fp
=
fopen
(
tsConfig
.
file
,
"w"
);
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
tsConfig
.
file
,
strerror
(
errno
));
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
tsConfig
.
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
:
\"
%d
\"
,
\n
"
,
tsConfig
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
tsConfig
.
clusterId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeInfos
\"
: [{
\n
"
);
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
ep
->
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
isMnode
\"
:
\"
%d
\"
,
\n
"
,
ep
->
isMnode
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeFqdn
\"
:
\"
%s
\"
,
\n
"
,
ep
->
dnodeFqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodePort
\"
:
\"
%u
\"\n
"
,
ep
->
dnodePort
);
if
(
i
<
tsConfig
.
dnodeEps
->
dnodeNum
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }]
\n
"
);
}
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
dInfo
(
"successed to write %s"
,
tsConfig
.
file
);
return
0
;
}
int32_t
dnodeInitConfig
()
{
tsConfig
.
dnodeId
=
0
;
tsConfig
.
dropped
=
0
;
tsConfig
.
clusterId
=
0
;
tsConfig
.
dnodeEps
=
NULL
;
snprintf
(
tsConfig
.
file
,
sizeof
(
tsConfig
.
file
),
"%s/dnodeEps.json"
,
tsDnodeDir
);
pthread_mutex_init
(
&
tsConfig
.
mutex
,
NULL
);
tsConfig
.
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tsConfig
.
dnodeHash
==
NULL
)
return
-
1
;
int32_t
ret
=
dnodeReadEps
();
if
(
ret
==
0
)
{
dInfo
(
"dnode eps is initialized"
);
}
return
ret
;
}
void
dnodeCleanupConfig
()
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
if
(
tsConfig
.
dnodeEps
!=
NULL
)
{
free
(
tsConfig
.
dnodeEps
);
tsConfig
.
dnodeEps
=
NULL
;
}
if
(
tsConfig
.
dnodeHash
)
{
taosHashCleanup
(
tsConfig
.
dnodeHash
);
tsConfig
.
dnodeHash
=
NULL
;
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
pthread_mutex_destroy
(
&
tsConfig
.
mutex
);
}
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
)
{
if
(
data
==
NULL
||
data
->
dnodeNum
<=
0
)
return
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
if
(
data
->
dnodeNum
!=
tsConfig
.
dnodeEps
->
dnodeNum
)
{
dnodeResetEps
(
data
);
dnodeWriteEps
();
}
else
{
int32_t
size
=
data
->
dnodeNum
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
);
if
(
memcmp
(
tsConfig
.
dnodeEps
,
data
,
size
)
!=
0
)
{
dnodeResetEps
(
data
);
dnodeWriteEps
();
}
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
SDnodeEp
*
ep
=
taosHashGet
(
tsConfig
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
ep
!=
NULL
)
{
if
(
port
)
*
port
=
ep
->
dnodePort
;
if
(
fqdn
)
tstrncpy
(
fqdn
,
ep
->
dnodeFqdn
,
TSDB_FQDN_LEN
);
if
(
epstr
)
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
)
{
if
(
tsConfig
.
dnodeId
!=
0
&&
!
data
->
dropped
)
return
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
tsConfig
.
dnodeId
=
data
->
dnodeId
;
tsConfig
.
clusterId
=
data
->
clusterId
;
tsConfig
.
dropped
=
data
->
dropped
;
dInfo
(
"dnodeId is set to %d, clusterId is set to %"
PRId64
,
data
->
dnodeId
,
data
->
clusterId
);
dnodeWriteEps
();
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
int32_t
dnodeGetDnodeId
()
{
int32_t
dnodeId
=
0
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
dnodeId
=
tsConfig
.
dnodeId
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
return
dnodeId
;
}
int64_t
dnodeGetClusterId
()
{
int64_t
clusterId
=
0
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
clusterId
=
tsConfig
.
clusterId
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
return
clusterId
;
}
source/dnode/mgmt/src/dnodeDnode.c
浏览文件 @
da0a115c
...
...
@@ -15,15 +15,411 @@
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "dnodeConfig.h"
#include "mnode.h"
#include "dnodeTransport.h"
#include "tthread.h"
#include "ttime.h"
#include "vnode.h"
#include "cJSON.h"
#include "thash.h"
static
struct
{
int32_t
dnodeId
;
int32_t
dropped
;
int64_t
clusterId
;
SDnodeEps
*
dnodeEps
;
SHashObj
*
dnodeHash
;
SRpcEpSet
mnodeEpSetForShell
;
SRpcEpSet
mnodeEpSetForPeer
;
char
file
[
PATH_MAX
+
20
];
pthread_mutex_t
mutex
;
}
tsConfig
;
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
*
epSet
=
tsConfig
.
mnodeEpSetForPeer
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
static
void
dnodeGetEpSetForShell
(
SRpcEpSet
*
epSet
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
*
epSet
=
tsConfig
.
mnodeEpSetForShell
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeUpdateMnodeEps
(
SRpcEpSet
*
ep
)
{
if
(
ep
!=
NULL
||
ep
->
numOfEps
<=
0
)
{
dError
(
"mnode is changed, but content is invalid, discard it"
);
return
;
}
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
dInfo
(
"mnode is changed, num:%d use:%d"
,
ep
->
numOfEps
,
ep
->
inUse
);
tsConfig
.
mnodeEpSetForPeer
=
*
ep
;
for
(
int32_t
i
=
0
;
i
<
ep
->
numOfEps
;
++
i
)
{
ep
->
port
[
i
]
-=
TSDB_PORT_DNODEDNODE
;
dInfo
(
"mnode index:%d %s:%u"
,
i
,
ep
->
fqdn
[
i
],
ep
->
port
[
i
]);
}
tsConfig
.
mnodeEpSetForShell
=
*
ep
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeSendRedirectMsg
(
SRpcMsg
*
rpcMsg
,
bool
forShell
)
{
SRpcConnInfo
connInfo
=
{
0
};
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
);
SRpcEpSet
epSet
=
{
0
};
if
(
forShell
)
{
dnodeGetEpSetForShell
(
&
epSet
);
}
else
{
dnodeGetEpSetForPeer
(
&
epSet
);
}
dDebug
(
"msg:%s will be redirected, num:%d use:%d"
,
taosMsg
[
rpcMsg
->
msgType
],
epSet
.
numOfEps
,
epSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
dDebug
(
"mnode index:%d %s:%d"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
]);
if
(
strcmp
(
epSet
.
fqdn
[
i
],
tsLocalFqdn
)
==
0
)
{
if
((
epSet
.
port
[
i
]
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
&&
!
forShell
)
||
(
epSet
.
port
[
i
]
==
tsServerPort
&&
forShell
))
{
epSet
.
inUse
=
(
i
+
1
)
%
epSet
.
numOfEps
;
dDebug
(
"mnode index:%d %s:%d set inUse to %d"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
],
epSet
.
inUse
);
}
}
epSet
.
port
[
i
]
=
htons
(
epSet
.
port
[
i
]);
}
rpcSendRedirectRsp
(
rpcMsg
->
handle
,
&
epSet
);
}
static
void
dnodePrintEps
()
{
dDebug
(
"print dnode list, num:%d"
,
tsConfig
.
dnodeEps
->
dnodeNum
);
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
dDebug
(
"dnode:%d, fqdn:%s port:%u isMnode:%d"
,
ep
->
dnodeId
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
,
ep
->
isMnode
);
}
}
static
void
dnodeResetEps
(
SDnodeEps
*
data
)
{
assert
(
data
!=
NULL
);
int32_t
size
=
sizeof
(
SDnodeEps
)
+
data
->
dnodeNum
*
sizeof
(
SDnodeEp
);
if
(
data
->
dnodeNum
>
tsConfig
.
dnodeEps
->
dnodeNum
)
{
SDnodeEps
*
tmp
=
calloc
(
1
,
size
);
if
(
tmp
==
NULL
)
return
;
tfree
(
tsConfig
.
dnodeEps
);
tsConfig
.
dnodeEps
=
tmp
;
}
if
(
tsConfig
.
dnodeEps
!=
data
)
{
memcpy
(
tsConfig
.
dnodeEps
,
data
,
size
);
}
tsConfig
.
mnodeEpSetForPeer
.
inUse
=
0
;
tsConfig
.
mnodeEpSetForShell
.
inUse
=
0
;
int32_t
index
=
0
;
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
i
++
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
if
(
!
ep
->
isMnode
)
continue
;
if
(
index
>=
TSDB_MAX_REPLICA
)
continue
;
strcpy
(
tsConfig
.
mnodeEpSetForShell
.
fqdn
[
index
],
ep
->
dnodeFqdn
);
strcpy
(
tsConfig
.
mnodeEpSetForPeer
.
fqdn
[
index
],
ep
->
dnodeFqdn
);
tsConfig
.
mnodeEpSetForShell
.
port
[
index
]
=
ep
->
dnodePort
;
tsConfig
.
mnodeEpSetForShell
.
port
[
index
]
=
ep
->
dnodePort
+
tsDnodeDnodePort
;
index
++
;
}
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
taosHashPut
(
tsConfig
.
dnodeHash
,
&
ep
->
dnodeId
,
sizeof
(
int32_t
),
ep
,
sizeof
(
SDnodeEp
));
}
dnodePrintEps
();
}
static
bool
dnodeIsDnodeEpChanged
(
int32_t
dnodeId
,
char
*
epstr
)
{
bool
changed
=
false
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
SDnodeEp
*
ep
=
taosHashGet
(
tsConfig
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
ep
!=
NULL
)
{
char
epSaved
[
TSDB_EP_LEN
+
1
];
snprintf
(
epSaved
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
changed
=
strcmp
(
epstr
,
epSaved
)
!=
0
;
tstrncpy
(
epstr
,
epSaved
,
TSDB_EP_LEN
);
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
return
changed
;
}
static
int32_t
dnodeReadEps
()
{
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
fp
=
fopen
(
tsConfig
.
file
,
"r"
);
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
root
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dnodeId not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
tsConfig
.
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dropped not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
tsConfig
.
dropped
=
atoi
(
dropped
->
valuestring
);
cJSON
*
clusterId
=
cJSON_GetObjectItem
(
root
,
"clusterId"
);
if
(
!
clusterId
||
clusterId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since clusterId not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
tsConfig
.
clusterId
=
atoll
(
clusterId
->
valuestring
);
cJSON
*
dnodeInfos
=
cJSON_GetObjectItem
(
root
,
"dnodeInfos"
);
if
(
!
dnodeInfos
||
dnodeInfos
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since dnodeInfos not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
int32_t
dnodeInfosSize
=
cJSON_GetArraySize
(
dnodeInfos
);
if
(
dnodeInfosSize
<=
0
)
{
dError
(
"failed to read %s since dnodeInfos size:%d invalid"
,
tsConfig
.
file
,
dnodeInfosSize
);
goto
PRASE_EPS_OVER
;
}
tsConfig
.
dnodeEps
=
calloc
(
1
,
dnodeInfosSize
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
));
if
(
tsConfig
.
dnodeEps
==
NULL
)
{
dError
(
"failed to calloc dnodeEpList since %s"
,
strerror
(
errno
));
goto
PRASE_EPS_OVER
;
}
tsConfig
.
dnodeEps
->
dnodeNum
=
dnodeInfosSize
;
for
(
int32_t
i
=
0
;
i
<
dnodeInfosSize
;
++
i
)
{
cJSON
*
dnodeInfo
=
cJSON_GetArrayItem
(
dnodeInfos
,
i
);
if
(
dnodeInfo
==
NULL
)
break
;
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
cJSON
*
dnodeId
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeId"
);
if
(
!
dnodeId
||
dnodeId
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodeId not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
ep
->
dnodeId
=
atoi
(
dnodeId
->
valuestring
);
cJSON
*
isMnode
=
cJSON_GetObjectItem
(
dnodeInfo
,
"isMnode"
);
if
(
!
isMnode
||
isMnode
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, isMnode not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
ep
->
isMnode
=
atoi
(
isMnode
->
valuestring
);
cJSON
*
dnodeFqdn
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodeFqdn"
);
if
(
!
dnodeFqdn
||
dnodeFqdn
->
type
!=
cJSON_String
||
dnodeFqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s, dnodeFqdn not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
tstrncpy
(
ep
->
dnodeFqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
dnodePort
=
cJSON_GetObjectItem
(
dnodeInfo
,
"dnodePort"
);
if
(
!
dnodePort
||
dnodePort
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s, dnodePort not found"
,
tsConfig
.
file
);
goto
PRASE_EPS_OVER
;
}
ep
->
dnodePort
=
atoi
(
dnodePort
->
valuestring
);
}
dInfo
(
"succcessed to read file %s"
,
tsConfig
.
file
);
dnodePrintEps
();
PRASE_EPS_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
if
(
dnodeIsDnodeEpChanged
(
tsConfig
.
dnodeId
,
tsLocalEp
))
{
dError
(
"dnode:%d, localEp %s different with dnodeEps.json and need reconfigured"
,
tsConfig
.
dnodeId
,
tsLocalEp
);
return
-
1
;
}
dnodeResetEps
(
tsConfig
.
dnodeEps
);
terrno
=
0
;
return
0
;
}
static
int32_t
dnodeWriteEps
()
{
FILE
*
fp
=
fopen
(
tsConfig
.
file
,
"w"
);
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
tsConfig
.
file
,
strerror
(
errno
));
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
tsConfig
.
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
:
\"
%d
\"
,
\n
"
,
tsConfig
.
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
clusterId
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
tsConfig
.
clusterId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeInfos
\"
: [{
\n
"
);
for
(
int32_t
i
=
0
;
i
<
tsConfig
.
dnodeEps
->
dnodeNum
;
++
i
)
{
SDnodeEp
*
ep
=
&
tsConfig
.
dnodeEps
->
dnodeEps
[
i
];
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeId
\"
:
\"
%d
\"
,
\n
"
,
ep
->
dnodeId
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
isMnode
\"
:
\"
%d
\"
,
\n
"
,
ep
->
isMnode
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodeFqdn
\"
:
\"
%s
\"
,
\n
"
,
ep
->
dnodeFqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dnodePort
\"
:
\"
%u
\"\n
"
,
ep
->
dnodePort
);
if
(
i
<
tsConfig
.
dnodeEps
->
dnodeNum
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }]
\n
"
);
}
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
dInfo
(
"successed to write %s"
,
tsConfig
.
file
);
return
0
;
}
int32_t
dnodeInitConfig
()
{
tsConfig
.
dnodeId
=
0
;
tsConfig
.
dropped
=
0
;
tsConfig
.
clusterId
=
0
;
tsConfig
.
dnodeEps
=
NULL
;
snprintf
(
tsConfig
.
file
,
sizeof
(
tsConfig
.
file
),
"%s/dnodeEps.json"
,
tsDnodeDir
);
pthread_mutex_init
(
&
tsConfig
.
mutex
,
NULL
);
tsConfig
.
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tsConfig
.
dnodeHash
==
NULL
)
return
-
1
;
int32_t
ret
=
dnodeReadEps
();
if
(
ret
==
0
)
{
dInfo
(
"dnode eps is initialized"
);
}
return
ret
;
}
void
dnodeCleanupConfig
()
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
if
(
tsConfig
.
dnodeEps
!=
NULL
)
{
free
(
tsConfig
.
dnodeEps
);
tsConfig
.
dnodeEps
=
NULL
;
}
if
(
tsConfig
.
dnodeHash
)
{
taosHashCleanup
(
tsConfig
.
dnodeHash
);
tsConfig
.
dnodeHash
=
NULL
;
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
pthread_mutex_destroy
(
&
tsConfig
.
mutex
);
}
void
dnodeUpdateDnodeEps
(
SDnodeEps
*
data
)
{
if
(
data
==
NULL
||
data
->
dnodeNum
<=
0
)
return
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
if
(
data
->
dnodeNum
!=
tsConfig
.
dnodeEps
->
dnodeNum
)
{
dnodeResetEps
(
data
);
dnodeWriteEps
();
}
else
{
int32_t
size
=
data
->
dnodeNum
*
sizeof
(
SDnodeEp
)
+
sizeof
(
SDnodeEps
);
if
(
memcmp
(
tsConfig
.
dnodeEps
,
data
,
size
)
!=
0
)
{
dnodeResetEps
(
data
);
dnodeWriteEps
();
}
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
epstr
,
char
*
fqdn
,
uint16_t
*
port
)
{
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
SDnodeEp
*
ep
=
taosHashGet
(
tsConfig
.
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
ep
!=
NULL
)
{
if
(
port
)
*
port
=
ep
->
dnodePort
;
if
(
fqdn
)
tstrncpy
(
fqdn
,
ep
->
dnodeFqdn
,
TSDB_FQDN_LEN
);
if
(
epstr
)
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
ep
->
dnodeFqdn
,
ep
->
dnodePort
);
}
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
void
dnodeUpdateCfg
(
SDnodeCfg
*
data
)
{
if
(
tsConfig
.
dnodeId
!=
0
&&
!
data
->
dropped
)
return
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
tsConfig
.
dnodeId
=
data
->
dnodeId
;
tsConfig
.
clusterId
=
data
->
clusterId
;
tsConfig
.
dropped
=
data
->
dropped
;
dInfo
(
"dnodeId is set to %d, clusterId is set to %"
PRId64
,
data
->
dnodeId
,
data
->
clusterId
);
dnodeWriteEps
();
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
}
int32_t
dnodeGetDnodeId
()
{
int32_t
dnodeId
=
0
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
dnodeId
=
tsConfig
.
dnodeId
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
return
dnodeId
;
}
int64_t
dnodeGetClusterId
()
{
int64_t
clusterId
=
0
;
pthread_mutex_lock
(
&
tsConfig
.
mutex
);
clusterId
=
tsConfig
.
clusterId
;
pthread_mutex_unlock
(
&
tsConfig
.
mutex
);
return
clusterId
;
}
static
struct
{
pthread_t
*
threadId
;
bool
s
top
;
bool
threadS
top
;
uint32_t
rebootTime
;
}
tsDnode
;
...
...
@@ -93,14 +489,14 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
static
void
*
dnodeThreadRoutine
(
void
*
param
)
{
int32_t
ms
=
tsStatusInterval
*
1000
;
while
(
!
tsDnode
.
s
top
)
{
while
(
!
tsDnode
.
threadS
top
)
{
taosMsleep
(
ms
);
dnodeSendStatusMsg
();
}
}
int32_t
dnodeInitDnode
()
{
tsDnode
.
s
top
=
false
;
tsDnode
.
threadS
top
=
false
;
tsDnode
.
rebootTime
=
taosGetTimestampSec
();
tsDnode
.
threadId
=
taosCreateThread
(
dnodeThreadRoutine
,
NULL
);
if
(
tsDnode
.
threadId
==
NULL
)
{
...
...
@@ -113,7 +509,7 @@ int32_t dnodeInitDnode() {
void
dnodeCleanupDnode
()
{
if
(
tsDnode
.
threadId
!=
NULL
)
{
tsDnode
.
s
top
=
true
;
tsDnode
.
threadS
top
=
true
;
taosDestoryThread
(
tsDnode
.
threadId
);
tsDnode
.
threadId
=
NULL
;
}
...
...
@@ -121,34 +517,6 @@ void dnodeCleanupDnode() {
dInfo
(
"dnode msg is cleanuped"
);
}
static
int32_t
dnodeStartMnode
(
SRpcMsg
*
pMsg
)
{
SCreateMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
mnodeNum
=
htonl
(
pCfg
->
mnodeNum
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
mnodeNum
;
++
i
)
{
pCfg
->
mnodeEps
[
i
].
dnodeId
=
htonl
(
pCfg
->
mnodeEps
[
i
].
dnodeId
);
pCfg
->
mnodeEps
[
i
].
dnodePort
=
htons
(
pCfg
->
mnodeEps
[
i
].
dnodePort
);
}
if
(
pCfg
->
dnodeId
!=
dnodeGetDnodeId
())
{
dDebug
(
"dnode:%d, in create meps msg is not equal with saved dnodeId:%d"
,
pCfg
->
dnodeId
,
dnodeGetDnodeId
());
return
TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED
;
}
if
(
mnodeGetStatus
()
==
MN_STATUS_READY
)
return
0
;
return
mnodeDeploy
();
}
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
dnodeStartMnode
(
pMsg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
void
dnodeProcessConfigDnodeReq
(
SRpcMsg
*
pMsg
)
{
SCfgDnodeMsg
*
pCfg
=
pMsg
->
pCont
;
...
...
source/dnode/mgmt/src/dnodeInt.c
浏览文件 @
da0a115c
...
...
@@ -14,8 +14,6 @@
*/
#define _DEFAULT_SOURCE
#include "dnodeCheck.h"
#include "dnodeConfig.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeTransport.h"
...
...
@@ -53,46 +51,6 @@ static void dnodeReportStartupFinished(char *name, char *desc) {
void
dnodeGetStartup
(
SStartupStep
*
pStep
)
{
memcpy
(
pStep
,
&
tsInt
.
startup
,
sizeof
(
SStartupStep
));
}
static
int32_t
dnodeInitMain
()
{
tsInt
.
runStatus
=
DN_RUN_STAT_STOPPED
;
tscEmbedded
=
1
;
taosIgnSIGPIPE
();
taosBlockSIGPIPE
();
taosResolveCRC
();
taosInitGlobalCfg
();
taosReadGlobalLogCfg
();
taosSetCoreDump
(
tsEnableCoreFile
);
if
(
!
taosMkDir
(
tsLogDir
))
{
printf
(
"failed to create dir: %s, reason: %s
\n
"
,
tsLogDir
,
strerror
(
errno
));
return
-
1
;
}
char
temp
[
TSDB_FILENAME_LEN
];
sprintf
(
temp
,
"%s/taosdlog"
,
tsLogDir
);
if
(
taosInitLog
(
temp
,
tsNumOfLogLines
,
1
)
<
0
)
{
printf
(
"failed to init log file
\n
"
);
}
if
(
!
taosReadGlobalCfg
())
{
taosPrintGlobalCfg
();
dError
(
"TDengine read global config failed"
);
return
-
1
;
}
dInfo
(
"start to initialize TDengine"
);
taosInitNotes
();
return
taosCheckGlobalCfg
();
}
static
void
dnodeCleanupMain
()
{
taos_cleanup
();
taosCloseLog
();
taosStopCacheRefreshWorker
();
}
static
int32_t
dnodeCheckRunning
(
char
*
dir
)
{
char
filepath
[
256
]
=
{
0
};
snprintf
(
filepath
,
sizeof
(
filepath
),
"%s/.running"
,
dir
);
...
...
@@ -140,24 +98,65 @@ static int32_t dnodeInitDir() {
return
0
;
}
static
void
dnodeCleanupDir
()
{}
static
int32_t
dnodeInitMain
()
{
tsInt
.
runStatus
=
DN_RUN_STAT_STOPPED
;
tscEmbedded
=
1
;
taosIgnSIGPIPE
();
taosBlockSIGPIPE
();
taosResolveCRC
();
taosInitGlobalCfg
();
taosReadGlobalLogCfg
();
taosSetCoreDump
(
tsEnableCoreFile
);
if
(
!
taosMkDir
(
tsLogDir
))
{
printf
(
"failed to create dir: %s, reason: %s
\n
"
,
tsLogDir
,
strerror
(
errno
));
return
-
1
;
}
char
temp
[
TSDB_FILENAME_LEN
];
sprintf
(
temp
,
"%s/taosdlog"
,
tsLogDir
);
if
(
taosInitLog
(
temp
,
tsNumOfLogLines
,
1
)
<
0
)
{
printf
(
"failed to init log file
\n
"
);
}
if
(
!
taosReadGlobalCfg
())
{
taosPrintGlobalCfg
();
dError
(
"TDengine read global config failed"
);
return
-
1
;
}
dInfo
(
"start to initialize TDengine"
);
taosInitNotes
();
if
(
taosCheckGlobalCfg
()
!=
0
)
{
return
-
1
;
}
dnodeInitDir
();
return
-
1
;
}
static
void
dnodeCleanupMain
()
{
taos_cleanup
();
taosCloseLog
();
taosStopCacheRefreshWorker
();
}
int32_t
dnodeInit
()
{
SSteps
*
steps
=
taosStepInit
(
24
,
dnodeReportStartup
);
if
(
steps
==
NULL
)
return
-
1
;
taosStepAdd
(
steps
,
"dnode-main"
,
dnodeInitMain
,
dnodeCleanupMain
);
taosStepAdd
(
steps
,
"dnode-dir"
,
dnodeInitDir
,
dnodeCleanupDir
);
taosStepAdd
(
steps
,
"dnode-check"
,
dnodeInitCheck
,
dnodeCleanupCheck
);
taosStepAdd
(
steps
,
"dnode-rpc"
,
rpcInit
,
rpcCleanup
);
taosStepAdd
(
steps
,
"dnode-tfs"
,
NULL
,
NULL
);
taosStepAdd
(
steps
,
"dnode-wal"
,
walInit
,
walCleanUp
);
taosStepAdd
(
steps
,
"dnode-sync"
,
syncInit
,
syncCleanUp
);
taosStepAdd
(
steps
,
"dnode-
config"
,
dnodeInitConfig
,
dnodeCleanupConfig
);
taosStepAdd
(
steps
,
"dnode-
dnode"
,
dnodeInitDnode
,
dnodeCleanupDnode
);
taosStepAdd
(
steps
,
"dnode-vnodes"
,
dnodeInitVnodes
,
dnodeCleanupVnodes
);
taosStepAdd
(
steps
,
"dnode-mnode"
,
dnodeInitMnode
,
dnodeCleanupMnode
);
taosStepAdd
(
steps
,
"dnode-trans"
,
dnodeInitTrans
,
dnodeCleanupTrans
);
taosStepAdd
(
steps
,
"dnode-dnode"
,
dnodeInitDnode
,
dnodeCleanupDnode
);
tsInt
.
steps
=
steps
;
taosStepExec
(
tsInt
.
steps
);
...
...
source/dnode/mgmt/src/dnodeMnode.c
浏览文件 @
da0a115c
...
...
@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "dnodeMnode.h"
#include "dnode
Config
.h"
#include "dnode
Dnode
.h"
#include "dnodeTransport.h"
#include "mnode.h"
...
...
@@ -32,3 +32,31 @@ int32_t dnodeInitMnode() {
}
void
dnodeCleanupMnode
()
{
mnodeCleanup
();
}
static
int32_t
dnodeStartMnode
(
SRpcMsg
*
pMsg
)
{
SCreateMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
mnodeNum
=
htonl
(
pCfg
->
mnodeNum
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
mnodeNum
;
++
i
)
{
pCfg
->
mnodeEps
[
i
].
dnodeId
=
htonl
(
pCfg
->
mnodeEps
[
i
].
dnodeId
);
pCfg
->
mnodeEps
[
i
].
dnodePort
=
htons
(
pCfg
->
mnodeEps
[
i
].
dnodePort
);
}
if
(
pCfg
->
dnodeId
!=
dnodeGetDnodeId
())
{
dDebug
(
"dnode:%d, in create meps msg is not equal with saved dnodeId:%d"
,
pCfg
->
dnodeId
,
dnodeGetDnodeId
());
return
TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED
;
}
if
(
mnodeGetStatus
()
==
MN_STATUS_READY
)
return
0
;
return
mnodeDeploy
();
}
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
dnodeStartMnode
(
pMsg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
}
\ No newline at end of file
source/dnode/mgmt/src/dnodeTransport.c
浏览文件 @
da0a115c
...
...
@@ -21,7 +21,6 @@
#define _DEFAULT_SOURCE
#include "dnodeTransport.h"
#include "dnodeConfig.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeVnodes.h"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录