Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d1a913d8
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d1a913d8
编写于
3月 10, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
shm
上级
c6e6ac21
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
390 addition
and
361 deletion
+390
-361
source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h
source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h
+22
-17
source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c
source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c
+139
-0
source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c
source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c
+131
-337
source/dnode/mgmt/impl/mnodeMgmt/src/mmProc.c
source/dnode/mgmt/impl/mnodeMgmt/src/mmProc.c
+0
-0
source/dnode/mgmt/impl/mnodeMgmt/src/mmQueue.c
source/dnode/mgmt/impl/mnodeMgmt/src/mmQueue.c
+0
-0
source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c
source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c
+95
-4
source/dnode/mgmt/impl/src/dndMgmt.c
source/dnode/mgmt/impl/src/dndMgmt.c
+3
-3
未找到文件。
source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h
浏览文件 @
d1a913d8
...
@@ -24,32 +24,37 @@ extern "C" {
...
@@ -24,32 +24,37 @@ extern "C" {
// interface
// interface
int32_t
mmInit
(
SDnode
*
pDnode
);
int32_t
mmInit
(
SDnode
*
pDnode
);
void
mmCleanup
(
SDnode
*
pDnode
);
void
mmCleanup
(
SDnode
*
pDnode
);
int32_t
mmProcessCreateMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
// internal
int32_t
mmProcessAlterMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
void
mmInitMsgFp
(
SMnodeMgmt
*
pMgmt
);
int32_t
mmProcessDropMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
SMnode
*
mmAcquire
(
SDnode
*
pDnode
);
void
mmRelease
(
SDnode
*
pDnode
,
SMnode
*
pMnode
);
// mmFile
// mmFile
int32_t
mmReadFile
(
SDnode
*
pDnode
);
int32_t
mmReadFile
(
SDnode
*
pDnode
);
int32_t
mmWriteFile
(
SDnode
*
pDnode
);
int32_t
mmWriteFile
(
SDnode
*
pDnode
);
// mmMsg
// mmHandle
void
mmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
// mmQueue
int32_t
mmWriteToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SMnodeMsg
*
pMnodeMsg
);
////////////
int32_t
dndGetUserAuthFromMnode
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
int32_t
dndGetUserAuthFromMnode
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
int32_t
dndProcessCreateMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
dndProcessAlterMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
dndProcessDropMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
mmGetMonitorInfo
(
SDnode
*
pDnode
,
SMonClusterInfo
*
pClusterInfo
,
SMonVgroupInfo
*
pVgroupInfo
,
int32_t
mmGetMonitorInfo
(
SDnode
*
pDnode
,
SMonClusterInfo
*
pClusterInfo
,
SMonVgroupInfo
*
pVgroupInfo
,
SMonGrantInfo
*
pGrantInfo
);
SMonGrantInfo
*
pGrantInfo
);
// mmMgmt
SMnode
*
mmAcquire
(
SDnode
*
pDnode
);
void
mmRelease
(
SDnode
*
pDnode
,
SMnode
*
pMnode
);
int32_t
mmOpen
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
);
int32_t
mmAlter
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
);
int32_t
mmDrop
(
SDnode
*
pDnode
);
int32_t
mmBuildOptionFromReq
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
,
SDCreateMnodeReq
*
pCreate
);
// mmWorker
int32_t
mmStartWorker
(
SDnode
*
pDnode
);
void
mmStopWorker
(
SDnode
*
pDnode
);
void
mmInitMsgFp
(
SMnodeMgmt
*
pMgmt
);
void
mmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
int32_t
mmPutMsgToWriteQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
mmPutMsgToReadQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
void
mmConsumeChildQueue
(
SDnode
*
pDnode
,
SBlockItem
*
pBlock
);
void
mmConsumeParentQueue
(
SMnodeMgmt
*
pMgmt
,
SBlockItem
*
pBlock
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c
0 → 100644
浏览文件 @
d1a913d8
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mm.h"
#include "dndMgmt.h"
int32_t
mmProcessCreateMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SDCreateMnodeReq
createReq
=
{
0
};
if
(
tDeserializeSDCreateMnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
createReq
.
replica
<=
1
||
createReq
.
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnodeOpt
option
=
{
0
};
if
(
mmBuildOptionFromReq
(
pDnode
,
&
option
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
mmRelease
(
pDnode
,
pMnode
);
terrno
=
TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
}
dDebug
(
"start to create mnode"
);
return
mmOpen
(
pDnode
,
&
option
);
}
int32_t
mmProcessAlterMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SDAlterMnodeReq
alterReq
=
{
0
};
if
(
tDeserializeSDCreateMnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
alterReq
.
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnodeOpt
option
=
{
0
};
if
(
mmBuildOptionFromReq
(
pDnode
,
&
option
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
;
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
}
dDebug
(
"start to alter mnode"
);
int32_t
code
=
mmAlter
(
pDnode
,
&
option
);
mmRelease
(
pDnode
,
pMnode
);
return
code
;
}
int32_t
mmProcessDropMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SDDropMnodeReq
dropReq
=
{
0
};
if
(
tDeserializeSMCreateDropMnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
dropReq
.
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
;
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
dDebug
(
"start to drop mnode"
);
int32_t
code
=
mmDrop
(
pDnode
);
mmRelease
(
pDnode
,
pMnode
);
return
code
;
}
int32_t
mmGetMonitorInfo
(
SDnode
*
pDnode
,
SMonClusterInfo
*
pClusterInfo
,
SMonVgroupInfo
*
pVgroupInfo
,
SMonGrantInfo
*
pGrantInfo
)
{
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
return
-
1
;
int32_t
code
=
mndGetMonitorInfo
(
pMnode
,
pClusterInfo
,
pVgroupInfo
,
pGrantInfo
);
mmRelease
(
pDnode
,
pMnode
);
return
code
;
}
int32_t
dndGetUserAuthFromMnode
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
dTrace
(
"failed to get user auth since %s"
,
terrstr
());
return
-
1
;
}
int32_t
code
=
mndRetriveAuth
(
pMnode
,
user
,
spi
,
encrypt
,
secret
,
ckey
);
mmRelease
(
pDnode
,
pMnode
);
dTrace
(
"user:%s, retrieve auth spi:%d encrypt:%d"
,
user
,
*
spi
,
*
encrypt
);
return
code
;
}
source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c
浏览文件 @
d1a913d8
...
@@ -18,11 +18,72 @@
...
@@ -18,11 +18,72 @@
#include "dndMgmt.h"
#include "dndMgmt.h"
#include "dndTransport.h"
#include "dndTransport.h"
#include "dndWorker.h"
static
void
mmInitOption
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
);
static
void
mmBuildOptionForDeploy
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
);
static
void
mmBuildOptionForOpen
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
);
static
bool
mmDeployRequired
(
SDnode
*
pDnode
);
static
int32_t
mmOpenImp
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
);
int32_t
mmInit
(
SDnode
*
pDnode
)
{
dInfo
(
"mnode mgmt start to init"
);
int32_t
code
=
-
1
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
mmInitMsgFp
(
pMgmt
);
if
(
mmReadFile
(
pDnode
)
!=
0
)
{
goto
_OVER
;
}
if
(
pMgmt
->
dropped
)
{
dInfo
(
"mnode has been dropped and needs to be deleted"
);
mndDestroy
(
pDnode
->
dir
.
mnode
);
code
=
0
;
goto
_OVER
;
}
if
(
!
pMgmt
->
deployed
)
{
bool
required
=
mmDeployRequired
(
pDnode
);
if
(
!
required
)
{
dInfo
(
"mnode does not need to be deployed"
);
code
=
0
;
goto
_OVER
;
}
dInfo
(
"mnode start to deploy"
);
SMnodeOpt
option
=
{
0
};
mmBuildOptionForDeploy
(
pDnode
,
&
option
);
code
=
mmOpen
(
pDnode
,
&
option
);
}
else
{
dInfo
(
"mnode start to open"
);
SMnodeOpt
option
=
{
0
};
mmBuildOptionForOpen
(
pDnode
,
&
option
);
code
=
mmOpen
(
pDnode
,
&
option
);
}
_OVER:
if
(
code
==
0
)
{
dInfo
(
"mnode mgmt init success"
);
}
else
{
dError
(
"failed to init mnode mgmt since %s"
,
terrstr
());
mmCleanup
(
pDnode
);
}
return
code
;
}
static
void
dndProcessMnodeQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
void
mmCleanup
(
SDnode
*
pDnode
)
{
dInfo
(
"mnode mgmt start to clean up"
);
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
if
(
pMgmt
->
pMnode
)
{
mmStopWorker
(
pDnode
);
mndClose
(
pMgmt
->
pMnode
);
pMgmt
->
pMnode
=
NULL
;
}
dInfo
(
"mnode mgmt is cleaned up"
);
}
SMnode
*
mmAcquire
(
SDnode
*
pDnode
)
{
SMnode
*
mmAcquire
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
...
@@ -54,40 +115,84 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) {
...
@@ -54,40 +115,84 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) {
dTrace
(
"release mnode, refCount:%d"
,
refCount
);
dTrace
(
"release mnode, refCount:%d"
,
refCount
);
}
}
static
int32_t
mmStartWorker
(
SDnode
*
pDnode
)
{
int32_t
mmOpen
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
readWorker
,
DND_WORKER_SINGLE
,
"mnode-read"
,
0
,
1
,
dndProcessMnodeQueue
)
!=
0
)
{
pMgmt
->
singleProc
=
true
;
dError
(
"failed to start mnode read worker since %s"
,
terrstr
());
pMgmt
->
isChild
=
false
;
int32_t
code
=
mmOpenImp
(
pDnode
,
pOption
);
if
(
code
==
0
&&
!
pMgmt
->
singleProc
)
{
SProcCfg
cfg
=
{
0
};
cfg
.
childFp
=
(
ProcFp
)
mmConsumeChildQueue
;
cfg
.
parentFp
=
(
ProcFp
)
mmConsumeParentQueue
;
cfg
.
childQueueSize
=
1024
*
1024
;
cfg
.
parentQueueSize
=
1024
*
1024
;
pMgmt
->
pProcess
=
taosProcInit
(
&
cfg
);
if
(
pMgmt
->
pProcess
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
writeWorker
,
DND_WORKER_SINGLE
,
"mnode-write"
,
0
,
1
,
dndProcessMnodeQueue
)
!=
0
)
{
pMgmt
->
pProcess
->
pParent
=
pDnode
;
dError
(
"failed to start mnode write worker since %s"
,
terrstr
());
pMgmt
->
pProcess
->
testFlag
=
true
;
return
taosProcStart
(
pMgmt
->
pProcess
);
}
return
code
;
}
int32_t
mmAlter
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
syncWorker
,
DND_WORKER_SINGLE
,
"mnode-sync"
,
0
,
1
,
dndProcessMnodeQueue
)
!=
0
)
{
if
(
mndAlter
(
pMnode
,
pOption
)
!=
0
)
{
dError
(
"failed to start mnode sync worker since %s"
,
terrstr
());
dError
(
"failed to alter mnode since %s"
,
terrstr
());
mmRelease
(
pDnode
,
pMnode
);
return
-
1
;
return
-
1
;
}
}
mmRelease
(
pDnode
,
pMnode
);
return
0
;
return
0
;
}
}
static
void
mmStopWorker
(
SDnode
*
pDnode
)
{
int32_t
mmDrop
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
taosWLockLatch
(
&
pMgmt
->
latch
);
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
pMgmt
->
deployed
=
0
;
if
(
pMnode
==
NULL
)
{
taosWUnLockLatch
(
&
pMgmt
->
latch
);
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
taosRLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dropped
=
1
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
mmWriteFile
(
pDnode
)
!=
0
)
{
taosRLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dropped
=
0
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
while
(
pMgmt
->
refCount
>
1
)
{
mmRelease
(
pDnode
,
pMnode
);
taosMsleep
(
10
);
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
}
dndCleanupWorker
(
&
pMgmt
->
readWorker
);
mmRelease
(
pDnode
,
pMnode
);
dndCleanupWorker
(
&
pMgmt
->
writeWorker
);
mmStopWorker
(
pDnode
);
dndCleanupWorker
(
&
pMgmt
->
syncWorker
);
pMgmt
->
deployed
=
0
;
mmWriteFile
(
pDnode
);
mndClose
(
pMnode
);
pMgmt
->
pMnode
=
NULL
;
mndDestroy
(
pDnode
->
dir
.
mnode
);
return
0
;
}
}
static
bool
mmDeployRequired
(
SDnode
*
pDnode
)
{
static
bool
mmDeployRequired
(
SDnode
*
pDnode
)
{
...
@@ -106,43 +211,7 @@ static bool mmDeployRequired(SDnode *pDnode) {
...
@@ -106,43 +211,7 @@ static bool mmDeployRequired(SDnode *pDnode) {
return
true
;
return
true
;
}
}
static
int32_t
mmPutMsgToQueue
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SRpcMsg
*
pRpcMsg
)
{
static
void
mmInitOption
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
int32_t
contLen
=
sizeof
(
SMnodeMsg
)
+
pRpcMsg
->
contLen
;
SMnodeMsg
*
pMnodeMsg
=
taosAllocateQitem
(
contLen
);
if
(
pMnodeMsg
==
NULL
)
{
return
-
1
;
}
pMnodeMsg
->
contLen
=
pRpcMsg
->
contLen
;
pMnodeMsg
->
pCont
=
(
char
*
)
pMnodeMsg
+
sizeof
(
SMnodeMsg
);
memcpy
(
pMnodeMsg
->
pCont
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
int32_t
code
=
mmWriteToWorker
(
pDnode
,
pWorker
,
pMnodeMsg
);
if
(
code
!=
0
)
{
taosFreeQitem
(
pMnodeMsg
);
}
return
code
;
}
static
int32_t
mmPutMsgToWriteQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
return
mmPutMsgToQueue
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pRpcMsg
);
}
static
int32_t
mmPutMsgToReadQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
return
mmPutMsgToQueue
(
pDnode
,
&
pDnode
->
mmgmt
.
readWorker
,
pRpcMsg
);
}
static
void
mmProcessChildQueue
(
SDnode
*
pDnode
,
SBlockItem
*
pBlock
)
{
SMnodeMsg
*
pMsg
=
(
SMnodeMsg
*
)
pBlock
->
pCont
;
if
(
mmWriteToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pMsg
)
!=
0
)
{
//todo
}
}
static
void
mmInitOptionImp
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
pOption
->
pDnode
=
pDnode
;
pOption
->
pDnode
=
pDnode
;
pOption
->
sendReqToDnodeFp
=
dndSendReqToDnode
;
pOption
->
sendReqToDnodeFp
=
dndSendReqToDnode
;
pOption
->
sendReqToMnodeFp
=
dndSendReqToMnode
;
pOption
->
sendReqToMnodeFp
=
dndSendReqToMnode
;
...
@@ -153,8 +222,8 @@ static void mmInitOptionImp(SDnode *pDnode, SMnodeOpt *pOption) {
...
@@ -153,8 +222,8 @@ static void mmInitOptionImp(SDnode *pDnode, SMnodeOpt *pOption) {
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
}
}
static
void
mm
InitDeployOption
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
static
void
mm
BuildOptionForDeploy
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
mmInitOption
Imp
(
pDnode
,
pOption
);
mmInitOption
(
pDnode
,
pOption
);
pOption
->
replica
=
1
;
pOption
->
replica
=
1
;
pOption
->
selfIndex
=
0
;
pOption
->
selfIndex
=
0
;
SReplica
*
pReplica
=
&
pOption
->
replicas
[
0
];
SReplica
*
pReplica
=
&
pOption
->
replicas
[
0
];
...
@@ -168,16 +237,16 @@ static void mmInitDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
...
@@ -168,16 +237,16 @@ static void mmInitDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
memcpy
(
&
pMgmt
->
replicas
,
pOption
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
memcpy
(
&
pMgmt
->
replicas
,
pOption
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
}
}
static
void
mm
InitOpenOptio
n
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
static
void
mm
BuildOptionForOpe
n
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
mmInitOption
Imp
(
pDnode
,
pOption
);
mmInitOption
(
pDnode
,
pOption
);
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
pOption
->
selfIndex
=
pMgmt
->
selfIndex
;
pOption
->
selfIndex
=
pMgmt
->
selfIndex
;
pOption
->
replica
=
pMgmt
->
replica
;
pOption
->
replica
=
pMgmt
->
replica
;
memcpy
(
&
pOption
->
replicas
,
pMgmt
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
memcpy
(
&
pOption
->
replicas
,
pMgmt
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
}
}
static
int32_t
mmInit
OptionFromReq
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
,
SDCreateMnodeReq
*
pCreate
)
{
int32_t
mmBuild
OptionFromReq
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
,
SDCreateMnodeReq
*
pCreate
)
{
mmInitOption
Imp
(
pDnode
,
pOption
);
mmInitOption
(
pDnode
,
pOption
);
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
...
@@ -239,278 +308,3 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
...
@@ -239,278 +308,3 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
dInfo
(
"mnode open successfully"
);
dInfo
(
"mnode open successfully"
);
return
0
;
return
0
;
}
}
static
void
dndMnodeProcessParentQueue
(
SMnodeMgmt
*
pMgmt
,
SBlockItem
*
pItem
)
{}
static
int32_t
mmOpen
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
pMgmt
->
singleProc
=
false
;
int32_t
code
=
mmOpenImp
(
pDnode
,
pOption
);
if
(
code
==
0
&&
!
pMgmt
->
singleProc
)
{
SProcCfg
cfg
=
{
0
};
cfg
.
childFp
=
(
ProcFp
)
mmProcessChildQueue
;
cfg
.
parentFp
=
(
ProcFp
)
dndMnodeProcessParentQueue
;
cfg
.
childQueueSize
=
1024
*
1024
;
cfg
.
parentQueueSize
=
1024
*
1024
;
pMgmt
->
pProcess
=
taosProcInit
(
&
cfg
);
if
(
pMgmt
->
pProcess
==
NULL
)
{
return
-
1
;
}
pMgmt
->
pProcess
->
pParent
=
pDnode
;
pMgmt
->
pProcess
->
testFlag
=
true
;
return
taosProcStart
(
pMgmt
->
pProcess
);
}
return
code
;
}
static
int32_t
dndAlterMnode
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
}
if
(
mndAlter
(
pMnode
,
pOption
)
!=
0
)
{
dError
(
"failed to alter mnode since %s"
,
terrstr
());
mmRelease
(
pDnode
,
pMnode
);
return
-
1
;
}
mmRelease
(
pDnode
,
pMnode
);
return
0
;
}
static
int32_t
dndDropMnode
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
taosRLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dropped
=
1
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
mmWriteFile
(
pDnode
)
!=
0
)
{
taosRLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dropped
=
0
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
mmRelease
(
pDnode
,
pMnode
);
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
mmRelease
(
pDnode
,
pMnode
);
mmStopWorker
(
pDnode
);
pMgmt
->
deployed
=
0
;
mmWriteFile
(
pDnode
);
mndClose
(
pMnode
);
pMgmt
->
pMnode
=
NULL
;
mndDestroy
(
pDnode
->
dir
.
mnode
);
return
0
;
}
int32_t
dndProcessCreateMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SDCreateMnodeReq
createReq
=
{
0
};
if
(
tDeserializeSDCreateMnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
createReq
.
replica
<=
1
||
createReq
.
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnodeOpt
option
=
{
0
};
if
(
mmInitOptionFromReq
(
pDnode
,
&
option
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
mmRelease
(
pDnode
,
pMnode
);
terrno
=
TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
}
dDebug
(
"start to create mnode"
);
return
mmOpenImp
(
pDnode
,
&
option
);
}
int32_t
dndProcessAlterMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SDAlterMnodeReq
alterReq
=
{
0
};
if
(
tDeserializeSDCreateMnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
alterReq
.
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnodeOpt
option
=
{
0
};
if
(
mmInitOptionFromReq
(
pDnode
,
&
option
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
;
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
}
dDebug
(
"start to alter mnode"
);
int32_t
code
=
dndAlterMnode
(
pDnode
,
&
option
);
mmRelease
(
pDnode
,
pMnode
);
return
code
;
}
int32_t
dndProcessDropMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
SDDropMnodeReq
dropReq
=
{
0
};
if
(
tDeserializeSMCreateDropMnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
dropReq
.
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_INVALID_OPTION
;
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
;
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
dDebug
(
"start to drop mnode"
);
int32_t
code
=
dndDropMnode
(
pDnode
);
mmRelease
(
pDnode
,
pMnode
);
return
code
;
}
static
void
dndProcessMnodeQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
mndProcessMsg
(
pMsg
);
mmRelease
(
pDnode
,
pMnode
);
}
else
{
mndSendRsp
(
pMsg
,
terrno
);
}
// mndCleanupMsg(pMsg);
}
int32_t
mmInit
(
SDnode
*
pDnode
)
{
dInfo
(
"mnode mgmt start to init"
);
int32_t
code
=
-
1
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
mmInitMsgFp
(
pMgmt
);
if
(
mmReadFile
(
pDnode
)
!=
0
)
{
goto
_OVER
;
}
if
(
pMgmt
->
dropped
)
{
dInfo
(
"mnode has been dropped and needs to be deleted"
);
mndDestroy
(
pDnode
->
dir
.
mnode
);
code
=
0
;
goto
_OVER
;
}
if
(
!
pMgmt
->
deployed
)
{
bool
required
=
mmDeployRequired
(
pDnode
);
if
(
!
required
)
{
dInfo
(
"mnode does not need to be deployed"
);
code
=
0
;
goto
_OVER
;
}
dInfo
(
"mnode start to deploy"
);
SMnodeOpt
option
=
{
0
};
mmInitDeployOption
(
pDnode
,
&
option
);
code
=
mmOpen
(
pDnode
,
&
option
);
}
else
{
dInfo
(
"mnode start to open"
);
SMnodeOpt
option
=
{
0
};
mmInitOpenOption
(
pDnode
,
&
option
);
code
=
mmOpen
(
pDnode
,
&
option
);
}
_OVER:
if
(
code
==
0
)
{
dInfo
(
"mnode mgmt init success"
);
}
else
{
dError
(
"failed to init mnode mgmt since %s"
,
terrstr
());
mmCleanup
(
pDnode
);
}
return
code
;
}
void
mmCleanup
(
SDnode
*
pDnode
)
{
dInfo
(
"mnode mgmt start to clean up"
);
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
if
(
pMgmt
->
pMnode
)
{
mmStopWorker
(
pDnode
);
mndClose
(
pMgmt
->
pMnode
);
pMgmt
->
pMnode
=
NULL
;
}
dInfo
(
"mnode mgmt is cleaned up"
);
}
int32_t
dndGetUserAuthFromMnode
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
dTrace
(
"failed to get user auth since %s"
,
terrstr
());
return
-
1
;
}
int32_t
code
=
mndRetriveAuth
(
pMnode
,
user
,
spi
,
encrypt
,
secret
,
ckey
);
mmRelease
(
pDnode
,
pMnode
);
dTrace
(
"user:%s, retrieve auth spi:%d encrypt:%d"
,
user
,
*
spi
,
*
encrypt
);
return
code
;
}
int32_t
mmGetMonitorInfo
(
SDnode
*
pDnode
,
SMonClusterInfo
*
pClusterInfo
,
SMonVgroupInfo
*
pVgroupInfo
,
SMonGrantInfo
*
pGrantInfo
)
{
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
return
-
1
;
int32_t
code
=
mndGetMonitorInfo
(
pMnode
,
pClusterInfo
,
pVgroupInfo
,
pGrantInfo
);
mmRelease
(
pDnode
,
pMnode
);
return
code
;
}
source/dnode/mgmt/impl/mnodeMgmt/src/mmProc.c
已删除
100644 → 0
浏览文件 @
c6e6ac21
source/dnode/mgmt/impl/mnodeMgmt/src/mmQueue.c
已删除
100644 → 0
浏览文件 @
c6e6ac21
source/dnode/mgmt/impl/mnodeMgmt/src/mm
Msg
.c
→
source/dnode/mgmt/impl/mnodeMgmt/src/mm
Worker
.c
浏览文件 @
d1a913d8
...
@@ -23,6 +23,45 @@
...
@@ -23,6 +23,45 @@
static
int32_t
mmProcessWriteMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmProcessWriteMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmProcessSyncMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmProcessSyncMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmProcessReadMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmProcessReadMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmPutMsgToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmPutRpcMsgToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SRpcMsg
*
pRpcMsg
);
static
void
mmConsumeQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
int32_t
mmStartWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
readWorker
,
DND_WORKER_SINGLE
,
"mnode-read"
,
0
,
1
,
mmConsumeQueue
)
!=
0
)
{
dError
(
"failed to start mnode read worker since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
writeWorker
,
DND_WORKER_SINGLE
,
"mnode-write"
,
0
,
1
,
mmConsumeQueue
)
!=
0
)
{
dError
(
"failed to start mnode write worker since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
syncWorker
,
DND_WORKER_SINGLE
,
"mnode-sync"
,
0
,
1
,
mmConsumeQueue
)
!=
0
)
{
dError
(
"failed to start mnode sync worker since %s"
,
terrstr
());
return
-
1
;
}
return
0
;
}
void
mmStopWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
taosWLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
deployed
=
0
;
taosWUnLockLatch
(
&
pMgmt
->
latch
);
while
(
pMgmt
->
refCount
>
1
)
{
taosMsleep
(
10
);
}
dndCleanupWorker
(
&
pMgmt
->
readWorker
);
dndCleanupWorker
(
&
pMgmt
->
writeWorker
);
dndCleanupWorker
(
&
pMgmt
->
syncWorker
);
}
void
mmInitMsgFp
(
SMnodeMgmt
*
pMgmt
)
{
void
mmInitMsgFp
(
SMnodeMgmt
*
pMgmt
)
{
// Requests handled by DNODE
// Requests handled by DNODE
...
@@ -151,18 +190,26 @@ _OVER:
...
@@ -151,18 +190,26 @@ _OVER:
}
}
int32_t
mmProcessWriteMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
int32_t
mmProcessWriteMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
return
mm
Write
ToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pMnodeMsg
);
return
mm
PutMsg
ToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pMnodeMsg
);
}
}
int32_t
mmProcessSyncMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
int32_t
mmProcessSyncMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
return
mm
Write
ToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
syncWorker
,
pMnodeMsg
);
return
mm
PutMsg
ToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
syncWorker
,
pMnodeMsg
);
}
}
int32_t
mmProcessReadMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
int32_t
mmProcessReadMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
return
mmWriteToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
readWorker
,
pMnodeMsg
);
return
mmPutMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
readWorker
,
pMnodeMsg
);
}
int32_t
mmPutMsgToWriteQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
return
mmPutRpcMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pRpcMsg
);
}
}
int32_t
mmWriteToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SMnodeMsg
*
pMnodeMsg
)
{
int32_t
mmPutMsgToReadQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
return
mmPutRpcMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
readWorker
,
pRpcMsg
);
}
static
int32_t
mmPutMsgToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SMnodeMsg
*
pMnodeMsg
)
{
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
return
-
1
;
if
(
pMnode
==
NULL
)
return
-
1
;
...
@@ -172,3 +219,47 @@ int32_t mmWriteToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnode
...
@@ -172,3 +219,47 @@ int32_t mmWriteToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnode
mmRelease
(
pDnode
,
pMnode
);
mmRelease
(
pDnode
,
pMnode
);
return
code
;
return
code
;
}
}
static
int32_t
mmPutRpcMsgToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
contLen
=
sizeof
(
SMnodeMsg
)
+
pRpcMsg
->
contLen
;
SMnodeMsg
*
pMnodeMsg
=
taosAllocateQitem
(
contLen
);
if
(
pMnodeMsg
==
NULL
)
{
return
-
1
;
}
pMnodeMsg
->
contLen
=
pRpcMsg
->
contLen
;
pMnodeMsg
->
pCont
=
(
char
*
)
pMnodeMsg
+
sizeof
(
SMnodeMsg
);
memcpy
(
pMnodeMsg
->
pCont
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
int32_t
code
=
mmPutMsgToWorker
(
pDnode
,
pWorker
,
pMnodeMsg
);
if
(
code
!=
0
)
{
taosFreeQitem
(
pMnodeMsg
);
}
return
code
;
}
void
mmConsumeChildQueue
(
SDnode
*
pDnode
,
SBlockItem
*
pBlock
)
{
SMnodeMsg
*
pMsg
=
(
SMnodeMsg
*
)
pBlock
->
pCont
;
if
(
mmPutMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pMsg
)
!=
0
)
{
// todo
}
}
void
mmConsumeParentQueue
(
SMnodeMgmt
*
pMgmt
,
SBlockItem
*
pBlock
)
{}
static
void
mmConsumeQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
mndProcessMsg
(
pMsg
);
mmRelease
(
pDnode
,
pMnode
);
}
else
{
mndSendRsp
(
pMsg
,
terrno
);
}
// mndCleanupMsg(pMsg);
}
source/dnode/mgmt/impl/src/dndMgmt.c
浏览文件 @
d1a913d8
...
@@ -691,13 +691,13 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
...
@@ -691,13 +691,13 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
switch
(
pMsg
->
msgType
)
{
switch
(
pMsg
->
msgType
)
{
case
TDMT_DND_CREATE_MNODE
:
case
TDMT_DND_CREATE_MNODE
:
code
=
dnd
ProcessCreateMnodeReq
(
pDnode
,
pMsg
);
code
=
mm
ProcessCreateMnodeReq
(
pDnode
,
pMsg
);
break
;
break
;
case
TDMT_DND_ALTER_MNODE
:
case
TDMT_DND_ALTER_MNODE
:
code
=
dnd
ProcessAlterMnodeReq
(
pDnode
,
pMsg
);
code
=
mm
ProcessAlterMnodeReq
(
pDnode
,
pMsg
);
break
;
break
;
case
TDMT_DND_DROP_MNODE
:
case
TDMT_DND_DROP_MNODE
:
code
=
dnd
ProcessDropMnodeReq
(
pDnode
,
pMsg
);
code
=
mm
ProcessDropMnodeReq
(
pDnode
,
pMsg
);
break
;
break
;
case
TDMT_DND_CREATE_QNODE
:
case
TDMT_DND_CREATE_QNODE
:
code
=
dndProcessCreateQnodeReq
(
pDnode
,
pMsg
);
code
=
dndProcessCreateQnodeReq
(
pDnode
,
pMsg
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录