Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e7ed5e3d
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e7ed5e3d
编写于
3月 14, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
shm
上级
a0722a9d
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
140 addition
and
197 deletion
+140
-197
source/dnode/mgmt/container/inc/dndInt.h
source/dnode/mgmt/container/inc/dndInt.h
+5
-27
source/dnode/mgmt/container/inc/dndTransport.h
source/dnode/mgmt/container/inc/dndTransport.h
+3
-2
source/dnode/mgmt/container/src/dndMonitor.c
source/dnode/mgmt/container/src/dndMonitor.c
+2
-1
source/dnode/mgmt/container/src/dndNode.c
source/dnode/mgmt/container/src/dndNode.c
+50
-70
source/dnode/mgmt/container/src/dndTransport.c
source/dnode/mgmt/container/src/dndTransport.c
+5
-32
source/dnode/mgmt/dnode/inc/dmFile.h
source/dnode/mgmt/dnode/inc/dmFile.h
+7
-11
source/dnode/mgmt/dnode/inc/dmHandle.h
source/dnode/mgmt/dnode/inc/dmHandle.h
+4
-4
source/dnode/mgmt/dnode/inc/dmInt.h
source/dnode/mgmt/dnode/inc/dmInt.h
+22
-0
source/dnode/mgmt/dnode/inc/dmMgmt.h
source/dnode/mgmt/dnode/inc/dmMgmt.h
+4
-4
source/dnode/mgmt/dnode/src/dmFile.c
source/dnode/mgmt/dnode/src/dmFile.c
+33
-40
source/dnode/mgmt/dnode/src/dmMgmt.c
source/dnode/mgmt/dnode/src/dmMgmt.c
+3
-2
source/dnode/mgmt/dnode/src/dmWorker.c
source/dnode/mgmt/dnode/src/dmWorker.c
+2
-4
未找到文件。
source/dnode/mgmt/container/inc/dndInt.h
浏览文件 @
e7ed5e3d
...
...
@@ -61,18 +61,17 @@ typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef
enum
{
DND_WORKER_SINGLE
,
DND_WORKER_MULTI
}
EWorkerType
;
typedef
enum
{
DND_ENV_INIT
,
DND_ENV_READY
,
DND_ENV_CLEANUP
}
EEnvStat
;
typedef
struct
SDnodeMgmt
SDnodeMgmt
;
typedef
struct
SMgmtFp
SMgmtFp
;
typedef
struct
SMgmtWrapper
SMgmtWrapper
;
typedef
struct
SMsgHandle
SMsgHandle
;
typedef
void
(
*
RpcMsgFp
)(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEps
);
typedef
void
(
*
NodeMsgFp
)(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
typedef
int32_t
(
*
MndMsgFp
)(
SDnode
*
pDnode
,
SMndMsg
*
pMsg
);
typedef
int32_t
(
*
OpenNodeFp
)(
SMgmtWrapper
*
pWrapper
);
typedef
void
(
*
CloseNodeFp
)(
S
Dnode
*
pDnode
,
S
MgmtWrapper
*
pWrapper
);
typedef
void
(
*
CloseNodeFp
)(
SMgmtWrapper
*
pWrapper
);
typedef
bool
(
*
RequireNodeFp
)(
SMgmtWrapper
*
pWrapper
);
typedef
int32_t
(
*
MgmtHandleMsgFp
)(
SMgmtWrapper
*
pNode
,
SNodeMsg
*
pMsg
);
typedef
SMsgHandle
(
*
GetMsgHandleFp
)(
SMgmtWrapper
*
pWrapper
,
int32_t
msgIndex
);
typedef
struct
SMsgHandle
{
...
...
@@ -95,25 +94,6 @@ typedef struct {
};
}
SDnodeWorker
;
typedef
struct
{
int32_t
dnodeId
;
int32_t
dropped
;
int64_t
clusterId
;
int64_t
dver
;
int64_t
rebootTime
;
int64_t
updateTime
;
int8_t
statusSent
;
SEpSet
mnodeEpSet
;
SHashObj
*
dnodeHash
;
SArray
*
pDnodeEps
;
pthread_t
*
threadId
;
SRWLatch
latch
;
SDnodeWorker
mgmtWorker
;
SDnodeWorker
statusWorker
;
//
SMsgHandle
msgHandles
[
TDMT_MAX
];
}
SDnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
...
...
@@ -223,8 +203,8 @@ typedef struct SMgmtWrapper {
EProcType
procType
;
SProcObj
*
pProc
;
void
*
pMgmt
;
SMgmtFp
fp
;
SDnode
*
pDnode
;
SMgmtFp
fp
;
}
SMgmtWrapper
;
typedef
struct
SDnode
{
...
...
@@ -234,12 +214,10 @@ typedef struct SDnode {
SDndCfg
cfg
;
SStartupReq
startup
;
TdFilePtr
pLockFile
;
SDnodeMgmt
dmgmt
;
STransMgmt
tmgmt
;
STfs
*
pTfs
;
SMgmtFp
fps
[
NODE_MAX
];
SMgmtWrapper
wrappers
[
NODE_MAX
];
char
*
path
;
}
SDnode
;
EDndStatus
dndGetStatus
(
SDnode
*
pDnode
);
...
...
source/dnode/mgmt/container/inc/dndTransport.h
浏览文件 @
e7ed5e3d
...
...
@@ -22,10 +22,11 @@
extern
"C"
{
#endif
int32_t
dndInitTrans
(
SDnode
*
pDnode
);
void
dndCleanupTrans
(
SDnode
*
pDnode
);
int32_t
dndInitServer
(
SDnode
*
pDnode
);
void
dndCleanupServer
(
SDnode
*
pDnode
);
int32_t
dndInitClient
(
SDnode
*
pDnode
);
void
dndCleanupClient
(
SDnode
*
pDnode
);
int32_t
dndSetMsgHandle
(
SDnode
*
pDnode
);
int32_t
dndSendReqToMnode
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
dndSendReqToDnode
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pRpcMsg
);
...
...
source/dnode/mgmt/container/src/dndMonitor.c
浏览文件 @
e7ed5e3d
...
...
@@ -37,6 +37,8 @@ static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
}
static
void
dndGetMonitorDnodeInfo
(
SDnode
*
pDnode
,
SMonDnodeInfo
*
pInfo
)
{
#if 0
pInfo->uptime = (taosGetTimestampMs() - pDnode->dmgmt.rebootTime) / (86400000.0f);
taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system);
pInfo->cpu_cores = tsNumOfCores;
...
...
@@ -49,7 +51,6 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
taosGetCardInfo(&pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
#if 0
SVnodesStat *pStat = &pDnode->vmgmt.stat;
pInfo->req_select = pStat->numOfSelectReqs;
pInfo->req_insert = pStat->numOfInsertReqs;
...
...
source/dnode/mgmt/container/src/dndNode.c
浏览文件 @
e7ed5e3d
...
...
@@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE
#include "dndNode.h"
#include "dmMgmt.h"
#include "dndTransport.h"
#include "bmInt.h"
#include "dmInt.h"
#include "mmInt.h"
#include "qmInt.h"
#include "smInt.h"
...
...
@@ -43,6 +43,10 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) {
return
required
;
}
static
int32_t
dndOpenNode
(
SMgmtWrapper
*
pWrapper
)
{
return
(
*
pWrapper
->
fp
.
openFp
)(
pWrapper
);
}
static
void
dndCloseNode
(
SMgmtWrapper
*
pWrapper
)
{
(
*
pWrapper
->
fp
.
closeFp
)(
pWrapper
);
}
static
void
dndClearMemory
(
SDnode
*
pDnode
)
{
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pMgmt
=
&
pDnode
->
wrappers
[
n
];
...
...
@@ -51,26 +55,12 @@ static void dndClearMemory(SDnode *pDnode) {
if
(
pDnode
->
pLockFile
!=
NULL
)
{
taosUnLockFile
(
pDnode
->
pLockFile
);
taosCloseFile
(
&
pDnode
->
pLockFile
);
pDnode
->
pLockFile
=
NULL
;
}
tfree
(
pDnode
->
path
);
tfree
(
pDnode
);
dDebug
(
"dnode object memory is cleared, data:%p"
,
pDnode
);
}
static
int32_t
dndInitResource
(
SDnode
*
pDnode
)
{
return
0
;
}
static
void
dndClearResource
(
SDnode
*
pDnode
)
{
dndCleanupTrans
(
pDnode
);
dndStopMgmt
(
pDnode
);
dndCleanupMgmt
(
pDnode
);
tfsClose
(
pDnode
->
pTfs
);
dDebug
(
"dnode object resource is cleared, data:%p"
,
pDnode
);
}
SDnode
*
dndCreate
(
SDndCfg
*
pCfg
)
{
dInfo
(
"start to create dnode object"
);
int32_t
code
=
-
1
;
...
...
@@ -84,12 +74,18 @@ SDnode *dndCreate(SDndCfg *pCfg) {
}
dndSetStatus
(
pDnode
,
DND_STAT_INIT
);
pDnode
->
pLockFile
=
dndCheckRunning
(
pCfg
->
dataDir
);
if
(
pDnode
->
pLockFile
==
NULL
)
{
goto
_OVER
;
}
if
(
dndInitServer
(
pDnode
)
!=
0
)
{
dError
(
"failed to init trans server since %s"
,
terrstr
());
goto
_OVER
;
}
snprintf
(
path
,
sizeof
(
path
),
"%s%sdnode"
,
pCfg
->
dataDir
,
TD_DIRSEP
);
pDnode
->
path
=
strdup
(
path
);
if
(
taosMkDir
(
path
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to create dir:%s since %s"
,
path
,
terrstr
());
if
(
dndInitClient
(
pDnode
)
!=
0
)
{
dError
(
"failed to init trans client since %s"
,
terrstr
());
goto
_OVER
;
}
...
...
@@ -107,10 +103,15 @@ SDnode *dndCreate(SDndCfg *pCfg) {
pDnode
->
wrappers
[
BNODE
].
name
=
"bnode"
;
memcpy
(
&
pDnode
->
cfg
,
pCfg
,
sizeof
(
SDndCfg
));
if
(
dndSetMsgHandle
(
pDnode
)
!=
0
)
{
goto
_OVER
;
}
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
snprintf
(
path
,
sizeof
(
path
),
"%s%s%s"
,
pCfg
->
dataDir
,
TD_DIRSEP
,
pDnode
->
wrappers
[
n
].
name
);
pWrapper
->
path
=
strdup
(
path
);
pWrapper
->
pDnode
=
pDnode
;
if
(
pDnode
->
wrappers
[
n
].
path
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_OVER
;
...
...
@@ -127,17 +128,11 @@ SDnode *dndCreate(SDndCfg *pCfg) {
}
}
pDnode
->
pLockFile
=
dndCheckRunning
(
pCfg
->
dataDir
);
if
(
pDnode
->
pLockFile
==
NULL
)
{
goto
_OVER
;
}
code
=
0
;
_OVER:
if
(
code
!=
0
&&
pDnode
)
{
dndClearMemory
(
pDnode
);
tfree
(
pDnode
);
dError
(
"failed to create dnode object since %s"
,
terrstr
());
}
else
{
dInfo
(
"dnode object is created, data:%p"
,
pDnode
);
...
...
@@ -157,64 +152,53 @@ void dndClose(SDnode *pDnode) {
dInfo
(
"start to close dnode, data:%p"
,
pDnode
);
dndSetStatus
(
pDnode
,
DND_STAT_STOPPED
);
dndClearResource
(
pDnode
);
dndCleanupServer
(
pDnode
);
dndCleanupClient
(
pDnode
);
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
dndCloseNode
(
pWrapper
);
}
dndClearMemory
(
pDnode
);
tfree
(
pDnode
);
dInfo
(
"dnode object is closed, data:%p"
,
pDnode
);
}
static
int32_t
dndOpenNode
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
)
{
// if (tsMultiProcess) {
// SProcCfg cfg = {0};
// pWrapper->pProc = taosProcInit(&cfg);
// if (taosProcIsChild(pWrapper->pProc)) {
// pWrapper->procType = PROC_CHILD;
// dInfo("node:%s, will start in child process", pWrapper->name);
// } else {
// pWrapper->procType = PROC_PARENT;
// dInfo("node:%s, will start in parent process", pWrapper->name);
// }
// } else {
// pWrapper->procType = PROC_SINGLE;
// dInfo("node:%s, will start in single process mnode", pWrapper->name);
// }
// if (pWrapper->procType == PROC_SINGLE || pWrapper->procType == PROC_CHILD) {
// SDndInfo info;
// pWrapper->pNode = (*pWrapper->fp.openFp)(pWrapper->path, &info);
// if (pWrapper != NULL) {
// return -1;
// }
// }
// return 0;
(
*
pWrapper
->
fp
.
openFp
)(
pWrapper
);
return
0
;
}
static
void
dndClearNodeExecpt
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
){}
static
int32_t
dndRunInSingleProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode run in single process mode"
);
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
dInfo
(
"node:%s, will start in single process"
,
pWrapper
->
name
);
if
(
dndOpenNode
(
p
Dnode
,
p
Wrapper
)
!=
0
)
{
if
(
dndOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
}
return
0
;
}
static
void
dndClearNodesExecpt
(
SDnode
*
pDnode
,
ENodeType
except
)
{
dndCleanupServer
(
pDnode
);
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
if
(
except
==
n
)
continue
;
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
dndCloseNode
(
pWrapper
);
}
}
static
int32_t
dndRunInMultiProcess
(
SDnode
*
pDnode
)
{
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
n
==
DNODE
)
{
dInfo
(
"node:%s, will start in parent process"
,
pWrapper
->
name
);
pWrapper
->
procType
=
PROC_PARENT
;
if
(
dndOpenNode
(
p
Dnode
,
p
Wrapper
)
!=
0
)
{
if
(
dndOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -236,17 +220,13 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dndResetLog
(
pWrapper
);
dInfo
(
"node:%s, clean up resources inherited from parent"
,
pWrapper
->
name
);
dndClearNodeExecpt
(
pDnode
,
pWrapper
);
dInfo
(
"node:%s, init trans client in child process"
,
pWrapper
->
name
);
dndInitClient
(
pDnode
);
dndClearNodesExecpt
(
pDnode
,
n
);
dInfo
(
"node:%s, will be initialized in child process"
,
pWrapper
->
name
);
dndOpenNode
(
p
Dnode
,
p
Wrapper
);
dndOpenNode
(
pWrapper
);
}
else
{
dInfo
(
"node:%s, will not start in parent process"
,
pWrapper
->
name
);
pWrapper
->
procType
=
PROC_PARENT
;
dndOpenNode
(
pDnode
,
pWrapper
);
}
}
...
...
source/dnode/mgmt/container/src/dndTransport.c
浏览文件 @
e7ed5e3d
...
...
@@ -15,9 +15,8 @@
#define _DEFAULT_SOURCE
#include "dndTransport.h"
#include "dm
Mgm
t.h"
#include "dm
In
t.h"
#include "mmInt.h"
#include "dmHandle.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
...
...
@@ -47,7 +46,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
}
}
int32_t
dndInitClient
(
SDnode
*
pDnode
)
{
int32_t
dndInitClient
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
SRpcInit
rpcInit
;
...
...
@@ -208,7 +207,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return
rpcRsp
.
code
;
}
int32_t
dndInitServer
(
SDnode
*
pDnode
)
{
int32_t
dndInitServer
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
int32_t
numOfThreads
=
(
int32_t
)((
tsNumOfCores
*
tsNumOfThreadsPerCore
)
/
2
.
0
);
...
...
@@ -238,7 +237,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return
0
;
}
static
void
dndCleanupServer
(
SDnode
*
pDnode
)
{
void
dndCleanupServer
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
if
(
pMgmt
->
serverRpc
)
{
rpcClose
(
pMgmt
->
serverRpc
);
...
...
@@ -247,7 +246,7 @@ static void dndCleanupServer(SDnode *pDnode) {
}
}
static
int32_t
dndSetMsgHandle
(
SDnode
*
pDnode
)
{
int32_t
dndSetMsgHandle
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
for
(
ENodeType
nodeType
=
0
;
nodeType
<
NODE_MAX
;
++
nodeType
)
{
...
...
@@ -274,32 +273,6 @@ static int32_t dndSetMsgHandle(SDnode *pDnode) {
return
0
;
}
int32_t
dndInitTrans
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode-transport start to init"
);
if
(
dndSetMsgHandle
(
pDnode
)
!=
0
)
{
return
-
1
;
}
if
(
dndInitClient
(
pDnode
)
!=
0
)
{
return
-
1
;
}
if
(
dndInitServer
(
pDnode
)
!=
0
)
{
return
-
1
;
}
dInfo
(
"dnode-transport is initialized"
);
return
0
;
}
void
dndCleanupTrans
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode-transport start to clean up"
);
dndCleanupServer
(
pDnode
);
dndCleanupClient
(
pDnode
);
dInfo
(
"dnode-transport is cleaned up"
);
}
int32_t
dndSendReqToDnode
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
if
(
pMgmt
->
clientRpc
==
NULL
)
{
...
...
source/dnode/mgmt/dnode/inc/dmFile.h
浏览文件 @
e7ed5e3d
...
...
@@ -13,25 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_FILE_H_
#define _TD_DND_FILE_H_
#ifndef _TD_DND_
DNODE_
FILE_H_
#define _TD_DND_
DNODE_
FILE_H_
#include "d
nd
Int.h"
#include "d
m
Int.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
dmReadFile
(
SDnode
*
pDnode
);
int32_t
dmWriteFile
(
SDnode
*
pDnode
);
void
dndUpdateDnodeEps
(
SDnode
*
pDnode
,
SArray
*
pDnodeEps
);
void
dndResetDnodes
(
SDnode
*
pDnode
,
SArray
*
pDnodeEps
);
void
dndPrintDnodes
(
SDnode
*
pDnode
);
bool
dndIsEpChanged
(
SDnode
*
pDnode
,
int32_t
dnodeId
,
char
*
pEp
);
int32_t
dmReadFile
(
SDnodeMgmt
*
pMgmt
);
int32_t
dmWriteFile
(
SDnodeMgmt
*
pMgmt
);
void
dndUpdateDnodeEps
(
SDnodeMgmt
*
pMgmt
,
SArray
*
pDnodeEps
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DND_FILE_H_*/
\ No newline at end of file
#endif
/*_TD_DND_DNODE_FILE_H_*/
\ No newline at end of file
source/dnode/mgmt/dnode/inc/dmHandle.h
浏览文件 @
e7ed5e3d
...
...
@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_HADNLE_H_
#define _TD_DND_HADNLE_H_
#ifndef _TD_DND_
DNODE_
HADNLE_H_
#define _TD_DND_
DNODE_
HADNLE_H_
#include "d
nd
Int.h"
#include "d
m
Int.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -33,4 +33,4 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
}
#endif
#endif
/*_TD_DND_HADNLE_H_*/
\ No newline at end of file
#endif
/*_TD_DND_DNODE_HADNLE_H_*/
\ No newline at end of file
source/dnode/mgmt/dnode/inc/dmInt.h
浏览文件 @
e7ed5e3d
...
...
@@ -22,7 +22,29 @@
extern
"C"
{
#endif
typedef
struct
SDnodeMgmt
{
int32_t
dnodeId
;
int32_t
dropped
;
int64_t
clusterId
;
int64_t
dver
;
int64_t
rebootTime
;
int64_t
updateTime
;
int8_t
statusSent
;
SEpSet
mnodeEpSet
;
SHashObj
*
dnodeHash
;
SArray
*
pDnodeEps
;
pthread_t
*
threadId
;
SRWLatch
latch
;
SDnodeWorker
mgmtWorker
;
SDnodeWorker
statusWorker
;
SMsgHandle
msgHandles
[
TDMT_MAX
];
const
char
*
path
;
SDnode
*
pDnode
;
}
SDnodeMgmt
;
SMgmtFp
dmGetMgmtFp
();
void
dndProcessStartupReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dndGetMnodeEpSet
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/dnode/inc/dmMgmt.h
浏览文件 @
e7ed5e3d
...
...
@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_MGMT_H_
#define _TD_DND_MGMT_H_
#ifndef _TD_DND_
DNODE_
MGMT_H_
#define _TD_DND_
DNODE_
MGMT_H_
#include "d
nd
Int.h"
#include "d
m
Int.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -38,4 +38,4 @@ void dndProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) ;
}
#endif
#endif
/*_TD_DND_MGMT_H_*/
\ No newline at end of file
#endif
/*_TD_DND_DNODE_MGMT_H_*/
\ No newline at end of file
source/dnode/mgmt/dnode/src/dmFile.c
浏览文件 @
e7ed5e3d
...
...
@@ -16,8 +16,18 @@
#define _DEFAULT_SOURCE
#include "dmFile.h"
int32_t
dmReadFile
(
SDnode
*
pDnode
)
{
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
static
void
dndPrintDnodes
(
SDnodeMgmt
*
pMgmt
);
static
bool
dndIsEpChanged
(
SDnodeMgmt
*
pMgmt
,
const
char
*
ep
);
static
void
dndResetDnodes
(
SDnodeMgmt
*
pMgmt
,
SArray
*
pDnodeEps
);
int32_t
dmReadFile
(
SDnodeMgmt
*
pMgmt
)
{
int32_t
code
=
TSDB_CODE_DND_DNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
256
*
1024
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
];
TdFilePtr
pFile
=
NULL
;
pMgmt
->
pDnodeEps
=
taosArrayInit
(
1
,
sizeof
(
SDnodeEp
));
if
(
pMgmt
->
pDnodeEps
==
NULL
)
{
...
...
@@ -25,16 +35,8 @@ int32_t dmReadFile(SDnode *pDnode) {
goto
PRASE_DNODE_OVER
;
}
int32_t
code
=
TSDB_CODE_DND_DNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
256
*
1024
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
];
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json"
,
pDnode
->
path
,
TD_DIRSEP
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json"
,
pMgmt
->
path
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
...
...
@@ -128,36 +130,34 @@ int32_t dmReadFile(SDnode *pDnode) {
code
=
0
;
dInfo
(
"succcessed to read file %s"
,
file
);
dndPrintDnodes
(
p
Dnode
);
dndPrintDnodes
(
p
Mgmt
);
PRASE_DNODE_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
if
(
dndIsEpChanged
(
p
Dnode
,
pMgmt
->
dnodeId
,
pDnode
->
cfg
.
localEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
pDnode
->
cfg
.
localEp
,
file
);
if
(
dndIsEpChanged
(
p
Mgmt
,
pMgmt
->
pDnode
->
cfg
.
localEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
p
Mgmt
->
p
Dnode
->
cfg
.
localEp
,
file
);
return
-
1
;
}
if
(
taosArrayGetSize
(
pMgmt
->
pDnodeEps
)
==
0
)
{
SDnodeEp
dnodeEp
=
{
0
};
dnodeEp
.
isMnode
=
1
;
taosGetFqdnPortFromEp
(
pDnode
->
cfg
.
firstEp
,
&
dnodeEp
.
ep
);
taosGetFqdnPortFromEp
(
p
Mgmt
->
p
Dnode
->
cfg
.
firstEp
,
&
dnodeEp
.
ep
);
taosArrayPush
(
pMgmt
->
pDnodeEps
,
&
dnodeEp
);
}
dndResetDnodes
(
p
Dnode
,
pMgmt
->
pDnodeEps
);
dndResetDnodes
(
p
Mgmt
,
pMgmt
->
pDnodeEps
);
terrno
=
0
;
return
0
;
}
int32_t
dmWriteFile
(
SDnode
*
pDnode
)
{
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
int32_t
dmWriteFile
(
SDnodeMgmt
*
pMgmt
)
{
char
file
[
PATH_MAX
];
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json.bak"
,
p
Dnode
->
path
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode.json.bak"
,
p
Mgmt
->
path
,
TD_DIRSEP
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
...
...
@@ -197,7 +197,7 @@ int32_t dmWriteFile(SDnode *pDnode) {
free
(
content
);
char
realfile
[
PATH_MAX
];
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%smnode.json"
,
p
Dnode
->
path
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%smnode.json"
,
p
Mgmt
->
path
,
TD_DIRSEP
);
if
(
taosRenameFile
(
file
,
realfile
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR
;
...
...
@@ -210,31 +210,28 @@ int32_t dmWriteFile(SDnode *pDnode) {
return
0
;
}
void
dndUpdateDnodeEps
(
SDnode
*
pDnode
,
SArray
*
pDnodeEps
)
{
void
dndUpdateDnodeEps
(
SDnode
Mgmt
*
pMgmt
,
SArray
*
pDnodeEps
)
{
int32_t
numOfEps
=
taosArrayGetSize
(
pDnodeEps
);
if
(
numOfEps
<=
0
)
return
;
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
taosWLockLatch
(
&
pMgmt
->
latch
);
int32_t
numOfEpsOld
=
(
int32_t
)
taosArrayGetSize
(
pMgmt
->
pDnodeEps
);
if
(
numOfEps
!=
numOfEpsOld
)
{
dndResetDnodes
(
p
Dnode
,
pDnodeEps
);
dmWriteFile
(
p
Dnode
);
dndResetDnodes
(
p
Mgmt
,
pDnodeEps
);
dmWriteFile
(
p
Mgmt
);
}
else
{
int32_t
size
=
numOfEps
*
sizeof
(
SDnodeEp
);
if
(
memcmp
(
pMgmt
->
pDnodeEps
->
pData
,
pDnodeEps
->
pData
,
size
)
!=
0
)
{
dndResetDnodes
(
p
Dnode
,
pDnodeEps
);
dmWriteFile
(
p
Dnode
);
dndResetDnodes
(
p
Mgmt
,
pDnodeEps
);
dmWriteFile
(
p
Mgmt
);
}
}
taosWUnLockLatch
(
&
pMgmt
->
latch
);
}
void
dndResetDnodes
(
SDnode
*
pDnode
,
SArray
*
pDnodeEps
)
{
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
static
void
dndResetDnodes
(
SDnodeMgmt
*
pMgmt
,
SArray
*
pDnodeEps
)
{
if
(
pMgmt
->
pDnodeEps
!=
pDnodeEps
)
{
SArray
*
tmp
=
pMgmt
->
pDnodeEps
;
pMgmt
->
pDnodeEps
=
taosArrayDup
(
pDnodeEps
);
...
...
@@ -262,12 +259,10 @@ void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps) {
taosHashPut
(
pMgmt
->
dnodeHash
,
&
pDnodeEp
->
id
,
sizeof
(
int32_t
),
pDnodeEp
,
sizeof
(
SDnodeEp
));
}
dndPrintDnodes
(
p
Dnode
);
dndPrintDnodes
(
p
Mgmt
);
}
void
dndPrintDnodes
(
SDnode
*
pDnode
)
{
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
static
void
dndPrintDnodes
(
SDnodeMgmt
*
pMgmt
)
{
int32_t
numOfEps
=
(
int32_t
)
taosArrayGetSize
(
pMgmt
->
pDnodeEps
);
dDebug
(
"print dnode ep list, num:%d"
,
numOfEps
);
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
i
++
)
{
...
...
@@ -276,17 +271,15 @@ void dndPrintDnodes(SDnode *pDnode) {
}
}
bool
dndIsEpChanged
(
SDnode
*
pDnode
,
int32_t
dnodeId
,
char
*
pE
p
)
{
static
bool
dndIsEpChanged
(
SDnodeMgmt
*
pMgmt
,
const
char
*
e
p
)
{
bool
changed
=
false
;
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
taosRLockLatch
(
&
pMgmt
->
latch
);
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
pMgmt
->
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
pMgmt
->
dnodeHash
,
&
pMgmt
->
dnodeId
,
sizeof
(
int32_t
));
if
(
pDnodeEp
!=
NULL
)
{
char
epstr
[
TSDB_EP_LEN
+
1
];
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
pDnodeEp
->
ep
.
fqdn
,
pDnodeEp
->
ep
.
port
);
changed
=
strcmp
(
pE
p
,
epstr
)
!=
0
;
changed
=
strcmp
(
e
p
,
epstr
)
!=
0
;
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
...
...
source/dnode/mgmt/dnode/src/dmMgmt.c
浏览文件 @
e7ed5e3d
...
...
@@ -296,6 +296,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) {
pMgmt
->
rebootTime
=
taosGetTimestampMs
();
pMgmt
->
dropped
=
0
;
pMgmt
->
clusterId
=
0
;
pMgmt
->
path
=
pWrapper
->
path
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dnodeHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
...
...
@@ -305,7 +306,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) {
return
-
1
;
}
if
(
dmReadFile
(
p
Wrapper
->
pDnode
)
!=
0
)
{
if
(
dmReadFile
(
p
Mgmt
)
!=
0
)
{
dError
(
"node:%s, failed to read file since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -352,7 +353,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) {
#endif
}
static
void
dmCleanup
(
S
Dnode
*
pDnode
,
S
MgmtWrapper
*
pWrapper
){
static
void
dmCleanup
(
SMgmtWrapper
*
pWrapper
){
}
...
...
source/dnode/mgmt/dnode/src/dmWorker.c
浏览文件 @
e7ed5e3d
...
...
@@ -18,10 +18,9 @@
#include "dndWorker.h"
#include "dmHandle.h"
static
void
*
dnodeThreadRoutine
(
void
*
param
)
{
SDnode
*
pDnode
=
param
;
SDnode
Mgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
SDnode
Mgmt
*
pMgmt
=
param
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
int64_t
lastStatusTime
=
taosGetTimestampMs
();
int64_t
lastMonitorTime
=
lastStatusTime
;
...
...
@@ -50,7 +49,6 @@ static void *dnodeThreadRoutine(void *param) {
}
}
static
void
dndProcessMgmtQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录