Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b6c28a49
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b6c28a49
编写于
11月 26, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update authentications
上级
04ea2358
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
101 addition
and
76 deletion
+101
-76
source/dnode/mgmt/impl/inc/dndInt.h
source/dnode/mgmt/impl/inc/dndInt.h
+2
-2
source/dnode/mgmt/impl/src/dndDnode.c
source/dnode/mgmt/impl/src/dndDnode.c
+4
-4
source/dnode/mgmt/impl/src/dndMnode.c
source/dnode/mgmt/impl/src/dndMnode.c
+38
-38
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+57
-21
source/dnode/mnode/impl/src/mnodeAuth.c
source/dnode/mnode/impl/src/mnodeAuth.c
+0
-11
未找到文件。
source/dnode/mgmt/impl/inc/dndInt.h
浏览文件 @
b6c28a49
...
...
@@ -109,9 +109,9 @@ typedef struct SDnode {
SDnodeOpt
opt
;
SDnodeDir
dir
;
SDnodeMgmt
dmgmt
;
SMnodeMgmt
m
;
SMnodeMgmt
m
mgmt
;
SVnodesMgmt
vmgmt
;
STransMgmt
t
;
STransMgmt
t
mgmt
;
SStartupMsg
startup
;
}
SDnode
;
...
...
source/dnode/mgmt/impl/src/dndDnode.c
浏览文件 @
b6c28a49
...
...
@@ -84,17 +84,17 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
rpcSendRedirectRsp
(
pMsg
->
handle
,
&
epSet
);
}
static
void
dndUpdateMnodeEpSet
(
SDnode
*
pDn
d
,
SEpSet
*
pEpSet
)
{
static
void
dndUpdateMnodeEpSet
(
SDnode
*
pDn
ode
,
SEpSet
*
pEpSet
)
{
dInfo
(
"mnode is changed, num:%d inUse:%d"
,
pEpSet
->
numOfEps
,
pEpSet
->
inUse
);
dndWLockDnode
(
pDn
d
);
dndWLockDnode
(
pDn
ode
);
pDn
d
->
dmgmt
.
mnodeEpSet
=
*
pEpSet
;
pDn
ode
->
dmgmt
.
mnodeEpSet
=
*
pEpSet
;
for
(
int32_t
i
=
0
;
i
<
pEpSet
->
numOfEps
;
++
i
)
{
dInfo
(
"mnode index:%d %s:%u"
,
i
,
pEpSet
->
fqdn
[
i
],
pEpSet
->
port
[
i
]);
}
dndWUnLockDnode
(
pDn
d
);
dndWUnLockDnode
(
pDn
ode
);
}
static
void
dndPrintDnodes
(
SDnode
*
pDnode
)
{
...
...
source/dnode/mgmt/impl/src/dndMnode.c
浏览文件 @
b6c28a49
...
...
@@ -67,7 +67,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static
int32_t
dndProcessDropMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
static
SMnode
*
dndAcquireMnode
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
NULL
;
int32_t
refCount
=
0
;
...
...
@@ -85,7 +85,7 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) {
}
static
void
dndReleaseMnode
(
SDnode
*
pDnode
,
SMnode
*
pMnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -98,7 +98,7 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
}
static
int32_t
dndReadMnodeFile
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
int32_t
code
=
TSDB_CODE_DND_MNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
300
;
...
...
@@ -152,7 +152,7 @@ PRASE_MNODE_OVER:
}
static
int32_t
dndWriteMnodeFile
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
char
file
[
PATH_MAX
+
20
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s.bak"
,
pMgmt
->
file
);
...
...
@@ -212,7 +212,7 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) {
}
static
void
dndStopMnodeWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
taosWLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
deployed
=
0
;
...
...
@@ -296,7 +296,7 @@ static int32_t dndBuildMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions, SCr
}
static
int32_t
dndOpenMnode
(
SDnode
*
pDnode
,
SMnodeOptions
*
pOptions
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
int32_t
code
=
dndStartMnodeWorker
(
pDnode
);
if
(
code
!=
0
)
{
...
...
@@ -332,7 +332,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
}
static
int32_t
dndAlterMnode
(
SDnode
*
pDnode
,
SMnodeOptions
*
pOptions
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
==
NULL
)
{
...
...
@@ -351,7 +351,7 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
}
static
int32_t
dndDropMnode
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
==
NULL
)
{
...
...
@@ -458,7 +458,7 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
}
static
void
dndProcessMnodeReadQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
...
...
@@ -472,7 +472,7 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
}
static
void
dndProcessMnodeWriteQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
...
...
@@ -486,7 +486,7 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
}
static
void
dndProcessMnodeApplyQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
...
...
@@ -500,7 +500,7 @@ static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
}
static
void
dndProcessMnodeSyncQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
...
...
@@ -532,7 +532,7 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs
}
void
dndProcessMnodeMgmtMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
,
SEpSet
*
pEpSet
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
...
...
@@ -545,7 +545,7 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
}
void
dndProcessMnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
==
NULL
||
dndWriteMnodeMsgToQueue
(
pMnode
,
pMgmt
->
pWriteQ
,
pMsg
)
!=
0
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
terrno
};
...
...
@@ -557,7 +557,7 @@ void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
void
dndProcessMnodeSyncMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
==
NULL
||
dndWriteMnodeMsgToQueue
(
pMnode
,
pMgmt
->
pSyncQ
,
pMsg
)
!=
0
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
terrno
};
...
...
@@ -569,7 +569,7 @@ void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
void
dndProcessMnodeReadMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
==
NULL
||
dndWriteMnodeMsgToQueue
(
pMnode
,
pMgmt
->
pSyncQ
,
pMsg
)
!=
0
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
terrno
};
...
...
@@ -581,7 +581,7 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static
int32_t
dndPutMsgIntoMnodeApplyQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
==
NULL
)
{
...
...
@@ -594,7 +594,7 @@ static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
}
static
int32_t
dndAllocMnodeMgmtQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
pMgmt
->
pMgmtQ
=
tWorkerAllocQueue
(
&
pMgmt
->
mgmtPool
,
NULL
,
(
FProcessItem
)
dndProcessMnodeMgmtQueue
);
if
(
pMgmt
->
pMgmtQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -604,13 +604,13 @@ static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
}
static
void
dndFreeMnodeMgmtQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
tWorkerFreeQueue
(
&
pMgmt
->
mgmtPool
,
pMgmt
->
pMgmtQ
);
pMgmt
->
pMgmtQ
=
NULL
;
}
static
int32_t
dndInitMnodeMgmtWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SWorkerPool
*
pPool
=
&
pMgmt
->
mgmtPool
;
pPool
->
name
=
"mnode-mgmt"
;
pPool
->
min
=
1
;
...
...
@@ -624,13 +624,13 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
}
static
void
dndCleanupMnodeMgmtWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
;
tWorkerCleanup
(
&
pMgmt
->
mgmtPool
);
}
static
int32_t
dndAllocMnodeReadQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
pMgmt
->
pReadQ
=
tWorkerAllocQueue
(
&
pMgmt
->
readPool
,
NULL
,
(
FProcessItem
)
dndProcessMnodeReadQueue
);
if
(
pMgmt
->
pReadQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -640,13 +640,13 @@ static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
}
static
void
dndFreeMnodeReadQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
tWorkerFreeQueue
(
&
pMgmt
->
readPool
,
pMgmt
->
pReadQ
);
pMgmt
->
pReadQ
=
NULL
;
}
static
int32_t
dndInitMnodeReadWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SWorkerPool
*
pPool
=
&
pMgmt
->
readPool
;
pPool
->
name
=
"mnode-read"
;
pPool
->
min
=
0
;
...
...
@@ -660,12 +660,12 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
}
static
void
dndCleanupMnodeReadWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
tWorkerCleanup
(
&
pMgmt
->
readPool
);
}
static
int32_t
dndAllocMnodeWriteQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
pMgmt
->
pWriteQ
=
tWorkerAllocQueue
(
&
pMgmt
->
writePool
,
NULL
,
(
FProcessItem
)
dndProcessMnodeWriteQueue
);
if
(
pMgmt
->
pWriteQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -675,13 +675,13 @@ static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
}
static
void
dndFreeMnodeWriteQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
tWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pMgmt
->
pWriteQ
);
pMgmt
->
pWriteQ
=
NULL
;
}
static
int32_t
dndAllocMnodeApplyQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
pMgmt
->
pApplyQ
=
tWorkerAllocQueue
(
&
pMgmt
->
writePool
,
NULL
,
(
FProcessItem
)
dndProcessMnodeApplyQueue
);
if
(
pMgmt
->
pApplyQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -691,13 +691,13 @@ static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) {
}
static
void
dndFreeMnodeApplyQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
tWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pMgmt
->
pApplyQ
);
pMgmt
->
pApplyQ
=
NULL
;
}
static
int32_t
dndInitMnodeWriteWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SWorkerPool
*
pPool
=
&
pMgmt
->
writePool
;
pPool
->
name
=
"mnode-write"
;
pPool
->
min
=
0
;
...
...
@@ -711,12 +711,12 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
}
static
void
dndCleanupMnodeWriteWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
tWorkerCleanup
(
&
pMgmt
->
writePool
);
}
static
int32_t
dndAllocMnodeSyncQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
pMgmt
->
pSyncQ
=
tWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
NULL
,
(
FProcessItem
)
dndProcessMnodeSyncQueue
);
if
(
pMgmt
->
pSyncQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -726,13 +726,13 @@ static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
}
static
void
dndFreeMnodeSyncQueue
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
tWorkerFreeQueue
(
&
pMgmt
->
syncPool
,
pMgmt
->
pSyncQ
);
pMgmt
->
pSyncQ
=
NULL
;
}
static
int32_t
dndInitMnodeSyncWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SWorkerPool
*
pPool
=
&
pMgmt
->
syncPool
;
pPool
->
name
=
"mnode-sync"
;
pPool
->
min
=
0
;
...
...
@@ -741,13 +741,13 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) {
}
static
void
dndCleanupMnodeSyncWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
tWorkerCleanup
(
&
pMgmt
->
syncPool
);
}
int32_t
dndInitMnode
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode-mnode start to init"
);
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
if
(
dndInitMnodeMgmtWorker
(
pDnode
)
!=
0
)
{
...
...
@@ -791,7 +791,7 @@ int32_t dndInitMnode(SDnode *pDnode) {
}
void
dndCleanupMnode
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
dInfo
(
"dnode-mnode start to clean up"
);
dndStopMnodeWorker
(
pDnode
);
...
...
@@ -801,7 +801,7 @@ void dndCleanupMnode(SDnode *pDnode) {
}
int32_t
dndGetUserAuthFromMnode
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
m
mgmt
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
==
NULL
)
{
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
b6c28a49
...
...
@@ -25,6 +25,10 @@
#include "dndMnode.h"
#include "dndVnodes.h"
#define INTERNAL_USER "_internal"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_secret"
static
void
dndInitMsgFp
(
STransMgmt
*
pMgmt
)
{
// msg from client to dnode
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dndProcessVnodeWriteMsg
;
...
...
@@ -121,7 +125,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
static
void
dndProcessResponse
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SDnode
*
pDnode
=
parent
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
mgmt
;
int32_t
msgType
=
pMsg
->
msgType
;
...
...
@@ -143,19 +147,19 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static
int32_t
dndInitClient
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
t
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
mgmt
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
label
=
"DND-C"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
dndProcessResponse
;
rpcInit
.
sessions
=
TSDB_MAX_VNODES
<<
4
;
rpcInit
.
sessions
=
8
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
pDnode
->
opt
.
shellActivityTimer
*
1000
;
rpcInit
.
user
=
"-internal"
;
rpcInit
.
ckey
=
"-key"
;
rpcInit
.
secret
=
"-secret"
;
rpcInit
.
user
=
INTERNAL_USER
;
rpcInit
.
ckey
=
INTERNAL_CKEY
;
rpcInit
.
secret
=
INTERNAL_SECRET
;
pMgmt
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pMgmt
->
clientRpc
==
NULL
)
{
...
...
@@ -167,7 +171,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
}
static
void
dndCleanupClient
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
t
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
mgmt
;
if
(
pMgmt
->
clientRpc
)
{
rpcClose
(
pMgmt
->
clientRpc
);
pMgmt
->
clientRpc
=
NULL
;
...
...
@@ -176,8 +180,8 @@ static void dndCleanupClient(SDnode *pDnode) {
}
static
void
dndProcessRequest
(
void
*
param
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SDnode
*
pDnode
=
param
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
;
SDnode
*
pDnode
=
param
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
mgmt
;
int32_t
msgType
=
pMsg
->
msgType
;
if
(
msgType
==
TSDB_MSG_TYPE_NETWORK_TEST
)
{
...
...
@@ -218,24 +222,56 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static
void
dndSendMsgToMnodeRecv
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
,
SRpcMsg
*
pRpcRsp
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
t
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
mgmt
;
SEpSet
epSet
=
{
0
};
dndGetMnodeEpSet
(
pDnode
,
&
epSet
);
rpcSendRecv
(
pMgmt
->
clientRpc
,
&
epSet
,
pRpcMsg
,
pRpcRsp
);
}
static
int32_t
dndAuthInternalMsg
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
if
(
strcmp
(
user
,
INTERNAL_USER
)
==
0
)
{
// A simple temporary implementation
char
pass
[
32
]
=
{
0
};
taosEncryptPass
((
uint8_t
*
)(
INTERNAL_SECRET
),
strlen
(
INTERNAL_SECRET
),
pass
);
memcpy
(
secret
,
pass
,
TSDB_KEY_LEN
);
*
spi
=
0
;
*
encrypt
=
0
;
*
ckey
=
0
;
return
0
;
}
else
if
(
strcmp
(
user
,
TSDB_NETTEST_USER
)
==
0
)
{
// A simple temporary implementation
char
pass
[
32
]
=
{
0
};
taosEncryptPass
((
uint8_t
*
)(
TSDB_NETTEST_USER
),
strlen
(
TSDB_NETTEST_USER
),
pass
);
memcpy
(
secret
,
pass
,
TSDB_KEY_LEN
);
*
spi
=
0
;
*
encrypt
=
0
;
*
ckey
=
0
;
return
0
;
}
else
{
return
-
1
;
}
}
static
int32_t
dndRetrieveUserAuthInfo
(
void
*
parent
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
SDnode
*
pDnode
=
parent
;
if
(
dndGetUserAuthFromMnode
(
pDnode
,
user
,
spi
,
encrypt
,
secret
,
ckey
)
!=
0
)
{
if
(
terrno
!=
TSDB_CODE_APP_NOT_READY
)
{
dTrace
(
"failed to get user auth from mnode since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndAuthInternalMsg
(
parent
,
user
,
spi
,
encrypt
,
secret
,
ckey
)
==
0
)
{
dTrace
(
"get internal auth success"
);
return
0
;
}
if
(
dndGetUserAuthFromMnode
(
pDnode
,
user
,
spi
,
encrypt
,
secret
,
ckey
)
==
0
)
{
dTrace
(
"get auth from internal mnode"
);
return
0
;
}
if
(
terrno
!=
TSDB_CODE_APP_NOT_READY
)
{
dTrace
(
"failed to get user auth from internal mnode since %s"
,
terrstr
());
return
-
1
;
}
dDebug
(
"user:%s, send auth msg to mnodes"
,
user
);
dDebug
(
"user:%s, send auth msg to
other
mnodes"
,
user
);
SAuthMsg
*
pMsg
=
rpcMallocCont
(
sizeof
(
SAuthMsg
));
tstrncpy
(
pMsg
->
user
,
user
,
TSDB_USER_LEN
);
...
...
@@ -246,14 +282,14 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
if
(
rpcRsp
.
code
!=
0
)
{
terrno
=
rpcRsp
.
code
;
dError
(
"user:%s, failed to get user auth from mnodes since %s"
,
user
,
terrstr
());
dError
(
"user:%s, failed to get user auth from
other
mnodes since %s"
,
user
,
terrstr
());
}
else
{
SAuthRsp
*
pRsp
=
rpcRsp
.
pCont
;
memcpy
(
secret
,
pRsp
->
secret
,
TSDB_KEY_LEN
);
memcpy
(
ckey
,
pRsp
->
ckey
,
TSDB_KEY_LEN
);
*
spi
=
pRsp
->
spi
;
*
encrypt
=
pRsp
->
encrypt
;
dDebug
(
"user:%s, success to get user auth from mnodes"
,
user
);
dDebug
(
"user:%s, success to get user auth from
other
mnodes"
,
user
);
}
rpcFreeCont
(
rpcRsp
.
pCont
);
...
...
@@ -261,7 +297,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
}
static
int32_t
dndInitServer
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
t
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
mgmt
;
dndInitMsgFp
(
pMgmt
);
int32_t
numOfThreads
=
(
int32_t
)((
pDnode
->
opt
.
numOfCores
*
pDnode
->
opt
.
numOfThreadsPerCore
)
/
2
.
0
);
...
...
@@ -290,7 +326,7 @@ static int32_t dndInitServer(SDnode *pDnode) {
}
static
void
dndCleanupServer
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
t
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
mgmt
;
if
(
pMgmt
->
serverRpc
)
{
rpcClose
(
pMgmt
->
serverRpc
);
pMgmt
->
serverRpc
=
NULL
;
...
...
@@ -317,7 +353,7 @@ void dndCleanupTrans(SDnode *pDnode) {
}
void
dndSendMsgToDnode
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
t
;
STransMgmt
*
pMgmt
=
&
pDnode
->
t
mgmt
;
rpcSendRequest
(
pMgmt
->
clientRpc
,
pEpSet
,
pMsg
,
NULL
);
}
...
...
source/dnode/mnode/impl/src/mnodeAuth.c
浏览文件 @
b6c28a49
...
...
@@ -21,16 +21,5 @@ int32_t mnodeInitAuth() { return 0; }
void
mnodeCleanupAuth
()
{}
int32_t
mnodeRetriveAuth
(
SMnode
*
pMnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
if
(
strcmp
(
user
,
TSDB_NETTEST_USER
)
==
0
)
{
char
pass
[
32
]
=
{
0
};
taosEncryptPass
((
uint8_t
*
)
user
,
strlen
(
user
),
pass
);
*
spi
=
0
;
*
encrypt
=
0
;
*
ckey
=
0
;
memcpy
(
secret
,
pass
,
TSDB_KEY_LEN
);
mDebug
(
"nettest user is authorized"
);
return
0
;
}
return
0
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录