Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2eb30ee0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2eb30ee0
编写于
4月 02, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact transport
上级
20f1e3f5
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
106 addition
and
115 deletion
+106
-115
source/dnode/mgmt/dm/inc/dmInt.h
source/dnode/mgmt/dm/inc/dmInt.h
+1
-0
source/dnode/mgmt/dm/src/dmInt.c
source/dnode/mgmt/dm/src/dmInt.c
+5
-9
source/dnode/mgmt/dm/src/dmMsg.c
source/dnode/mgmt/dm/src/dmMsg.c
+3
-1
source/dnode/mgmt/main/inc/dnd.h
source/dnode/mgmt/main/inc/dnd.h
+5
-7
source/dnode/mgmt/main/inc/dndInt.h
source/dnode/mgmt/main/inc/dndInt.h
+0
-1
source/dnode/mgmt/main/src/dndExec.c
source/dnode/mgmt/main/src/dndExec.c
+0
-63
source/dnode/mgmt/main/src/dndInt.c
source/dnode/mgmt/main/src/dndInt.c
+1
-0
source/dnode/mgmt/main/src/dndTransport.c
source/dnode/mgmt/main/src/dndTransport.c
+91
-34
未找到文件。
source/dnode/mgmt/dm/inc/dmInt.h
浏览文件 @
2eb30ee0
...
...
@@ -33,6 +33,7 @@ typedef struct SDnodeMgmt {
SRWLatch
latch
;
SSingleWorker
mgmtWorker
;
SSingleWorker
statusWorker
;
SMsgCb
msgCb
;
const
char
*
path
;
SDnode
*
pDnode
;
SMgmtWrapper
*
pWrapper
;
...
...
source/dnode/mgmt/dm/src/dmInt.c
浏览文件 @
2eb30ee0
...
...
@@ -112,17 +112,14 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
return
-
1
;
}
if
(
dndInitServer
(
pDnode
)
!=
0
)
{
dError
(
"failed to init trans server since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndInitClient
(
pDnode
)
!=
0
)
{
dError
(
"failed to init trans client since %s"
,
terrstr
());
if
(
dndInitTrans
(
pDnode
)
!=
0
)
{
dError
(
"failed to init transport since %s"
,
terrstr
());
return
-
1
;
}
pWrapper
->
pMgmt
=
pMgmt
;
pMgmt
->
msgCb
=
dndCreateMsgcb
(
pWrapper
);
dInfo
(
"dnode-mgmt is initialized"
);
return
0
;
}
...
...
@@ -151,8 +148,7 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
taosMemoryFree
(
pMgmt
);
pWrapper
->
pMgmt
=
NULL
;
dndCleanupServer
(
pDnode
);
dndCleanupClient
(
pDnode
);
dndCleanupTrans
(
pDnode
);
dInfo
(
"dnode-mgmt is cleaned up"
);
}
...
...
source/dnode/mgmt/dm/src/dmMsg.c
浏览文件 @
2eb30ee0
...
...
@@ -57,7 +57,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
pMgmt
->
statusSent
=
1
;
dTrace
(
"send req:%s to mnode, app:%p"
,
TMSG_INFO
(
rpcMsg
.
msgType
),
rpcMsg
.
ahandle
);
dndSendReqToMnode
(
pMgmt
->
pWrapper
,
&
rpcMsg
);
SEpSet
epSet
=
{
0
};
dmGetMnodeEpSet
(
pMgmt
,
&
epSet
);
tmsgSendReq
(
&
pMgmt
->
msgCb
,
&
epSet
,
&
rpcMsg
);
}
static
void
dmUpdateDnodeCfg
(
SDnodeMgmt
*
pMgmt
,
SDnodeCfg
*
pCfg
)
{
...
...
source/dnode/mgmt/main/inc/dnd.h
浏览文件 @
2eb30ee0
...
...
@@ -92,6 +92,7 @@ typedef struct SMgmtWrapper {
char
*
path
;
int32_t
refCount
;
SRWLatch
latch
;
ENodeType
ntype
;
bool
deployed
;
bool
required
;
EProcType
procType
;
...
...
@@ -160,13 +161,10 @@ const char *dndNodeProcStr(ENodeType ntype);
const
char
*
dndEventStr
(
EDndEvent
ev
);
// dndTransport.h
int32_t
dndInitServer
(
SDnode
*
pDnode
);
void
dndCleanupServer
(
SDnode
*
pDnode
);
int32_t
dndInitClient
(
SDnode
*
pDnode
);
void
dndCleanupClient
(
SDnode
*
pDnode
);
int32_t
dndSendReqToMnode
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
dndSendReq
(
SMgmtWrapper
*
pWrapper
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
SMsgCb
dndCreateMsgcb
(
SMgmtWrapper
*
pWrapper
);
int32_t
dndInitTrans
(
SDnode
*
pDnode
);
void
dndCleanupTrans
(
SDnode
*
pDnode
);
SMsgCb
dndCreateMsgcb
(
SMgmtWrapper
*
pWrapper
);
SProcCfg
dndGenProcCfg
(
SMgmtWrapper
*
pWrapper
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/main/inc/dndInt.h
浏览文件 @
2eb30ee0
...
...
@@ -49,7 +49,6 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndTransport.c
int32_t
dndInitMsgHandle
(
SDnode
*
pDnode
);
void
dndSendRpcRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
);
// dndFile.c
TdFilePtr
dndCheckRunning
(
const
char
*
dataDir
);
...
...
source/dnode/mgmt/main/src/dndExec.c
浏览文件 @
2eb30ee0
...
...
@@ -65,53 +65,6 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
dDebug
(
"node:%s, mgmt has been closed"
,
pWrapper
->
name
);
}
static
void
dndConsumeChildQueue
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
,
int16_t
msgLen
,
void
*
pCont
,
int32_t
contLen
,
ProcFuncType
ftype
)
{
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
pRpc
->
pCont
=
pCont
;
dTrace
(
"msg:%p, get from child queue, handle:%p app:%p"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
);
NodeMsgFp
msgFp
=
pWrapper
->
msgFps
[
TMSG_INDEX
(
pRpc
->
msgType
)];
int32_t
code
=
(
*
msgFp
)(
pWrapper
,
pMsg
);
if
(
code
!=
0
)
{
dError
(
"msg:%p, failed to process since code:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
if
(
pRpc
->
msgType
&
1U
)
{
SRpcMsg
rsp
=
{.
handle
=
pRpc
->
handle
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
terrno
};
tmsgSendRsp
(
&
rsp
);
}
dTrace
(
"msg:%p, is freed"
,
pMsg
);
taosFreeQitem
(
pMsg
);
rpcFreeCont
(
pCont
);
}
}
static
void
dndConsumeParentQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
,
int16_t
msgLen
,
void
*
pCont
,
int32_t
contLen
,
ProcFuncType
ftype
)
{
pMsg
->
pCont
=
pCont
;
dTrace
(
"msg:%p, get from parent queue, ftype:%d handle:%p, app:%p"
,
pMsg
,
ftype
,
pMsg
->
handle
,
pMsg
->
ahandle
);
switch
(
ftype
)
{
case
PROC_REGIST
:
rpcRegisterBrokenLinkArg
(
pMsg
);
break
;
case
PROC_RELEASE
:
rpcReleaseHandle
(
pMsg
->
handle
,
(
int8_t
)
pMsg
->
code
);
rpcFreeCont
(
pCont
);
break
;
case
PROC_REQ
:
dndSendReqToMnode
(
pWrapper
,
pMsg
);
// dndSendReq(pWrapper, (const SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break
;
case
PROC_RSP
:
dndSendRpcRsp
(
pWrapper
,
pMsg
);
break
;
default:
break
;
}
taosMemoryFree
(
pMsg
);
}
static
int32_t
dndNewProc
(
SMgmtWrapper
*
pWrapper
,
ENodeType
n
)
{
char
tstr
[
8
]
=
{
0
};
...
...
@@ -135,22 +88,6 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
return
0
;
}
static
SProcCfg
dndGenProcCfg
(
SMgmtWrapper
*
pWrapper
)
{
SProcCfg
cfg
=
{.
childConsumeFp
=
(
ProcConsumeFp
)
dndConsumeChildQueue
,
.
childMallocHeadFp
=
(
ProcMallocFp
)
taosAllocateQitem
,
.
childFreeHeadFp
=
(
ProcFreeFp
)
taosFreeQitem
,
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentConsumeFp
=
(
ProcConsumeFp
)
dndConsumeParentQueue
,
.
parentMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
shm
,
.
pParent
=
pWrapper
,
.
name
=
pWrapper
->
name
};
return
cfg
;
}
static
int32_t
dndRunInSingleProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode run in single process"
);
...
...
source/dnode/mgmt/main/src/dndInt.c
浏览文件 @
2eb30ee0
...
...
@@ -95,6 +95,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
pWrapper
->
path
=
strdup
(
path
);
pWrapper
->
shm
.
id
=
-
1
;
pWrapper
->
pDnode
=
pDnode
;
pWrapper
->
ntype
=
n
;
if
(
pWrapper
->
path
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_OVER
;
...
...
source/dnode/mgmt/main/src/dndTransport.c
浏览文件 @
2eb30ee0
...
...
@@ -61,7 +61,7 @@ static void dndProcessResponse(SDnode *pDnode, SRpcMsg *pRsp, SEpSet *pEpSet) {
}
}
int32_t
dndInitClient
(
SDnode
*
pDnode
)
{
static
int32_t
dndInitClient
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
;
...
...
@@ -91,7 +91,7 @@ int32_t dndInitClient(SDnode *pDnode) {
return
0
;
}
void
dndCleanupClient
(
SDnode
*
pDnode
)
{
static
void
dndCleanupClient
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
if
(
pMgmt
->
clientRpc
)
{
rpcClose
(
pMgmt
->
clientRpc
);
...
...
@@ -227,7 +227,7 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch
return
rpcRsp
.
code
;
}
int32_t
dndInitServer
(
SDnode
*
pDnode
)
{
static
int32_t
dndInitServer
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
int32_t
numOfThreads
=
(
int32_t
)((
tsNumOfCores
*
tsNumOfThreadsPerCore
)
/
2
.
0
);
...
...
@@ -257,7 +257,7 @@ int32_t dndInitServer(SDnode *pDnode) {
return
0
;
}
void
dndCleanupServer
(
SDnode
*
pDnode
)
{
static
void
dndCleanupServer
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
if
(
pMgmt
->
serverRpc
)
{
rpcClose
(
pMgmt
->
serverRpc
);
...
...
@@ -266,6 +266,17 @@ void dndCleanupServer(SDnode *pDnode) {
}
}
int32_t
dndInitTrans
(
SDnode
*
pDnode
)
{
if
(
dndInitServer
(
pDnode
)
!=
0
)
return
-
1
;
if
(
dndInitClient
(
pDnode
)
!=
0
)
return
-
1
;
return
0
;
}
void
dndCleanupTrans
(
SDnode
*
pDnode
)
{
dndCleanupServer
(
pDnode
);
dndCleanupClient
(
pDnode
);
}
int32_t
dndInitMsgHandle
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
trans
;
...
...
@@ -315,46 +326,28 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p
return
0
;
}
int32_t
dndSendReqToMnode
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pReq
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
STransMgmt
*
pTrans
=
&
pDnode
->
trans
;
SEpSet
epSet
=
{
0
};
SMgmtWrapper
*
pWrapper2
=
dndAcquireWrapper
(
pDnode
,
DNODE
);
if
(
pWrapper2
!=
NULL
)
{
dmGetMnodeEpSet
(
pWrapper2
->
pMgmt
,
&
epSet
);
dndReleaseWrapper
(
pWrapper2
);
}
return
dndSendRpcReq
(
pTrans
,
&
epSet
,
pReq
);
}
void
dndSendRpcRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
)
{
static
void
dndSendRpcRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
)
{
if
(
pRsp
->
code
==
TSDB_CODE_APP_NOT_READY
)
{
SMgmtWrapper
*
pDnodeWrapper
=
dndAcquireWrapper
(
pWrapper
->
pDnode
,
DNODE
);
if
(
pDnodeWrapper
!=
NULL
)
{
dmSendRedirectRsp
(
pDnodeWrapper
->
pMgmt
,
pRsp
);
dndReleaseWrapper
(
pDnodeWrapper
);
}
else
{
rpcSendResponse
(
pRsp
);
if
(
pWrapper
->
ntype
==
MNODE
)
{
dmSendRedirectRsp
(
pWrapper
->
pMgmt
,
pRsp
);
return
;
}
}
else
{
rpcSendResponse
(
pRsp
);
}
rpcSendResponse
(
pRsp
);
}
int32_t
dndSendReq
(
SMgmtWrapper
*
pWrapper
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
if
(
dndGetStatus
(
pDnode
)
!=
DND_STAT_RUNNING
)
{
static
int32_t
dndSendReq
(
SMgmtWrapper
*
pWrapper
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
if
(
dndGetStatus
(
pWrapper
->
pDnode
)
!=
DND_STAT_RUNNING
)
{
terrno
=
TSDB_CODE_DND_OFFLINE
;
dError
(
"failed to send rpc msg since %s, handle:%p"
,
terrstr
(),
pReq
->
handle
);
return
-
1
;
}
if
(
pWrapper
->
procType
!=
PROC_CHILD
)
{
return
dndSendRpcReq
(
&
pDnode
->
trans
,
pEpSet
,
pReq
);
return
dndSendRpcReq
(
&
p
Wrapper
->
p
Dnode
->
trans
,
pEpSet
,
pReq
);
}
else
{
int32_t
headLen
=
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
);
char
*
pHead
=
taosMemoryMalloc
(
headLen
);
char
*
pHead
=
taosMemoryMalloc
(
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
));
if
(
pHead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -362,8 +355,8 @@ int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq)
memcpy
(
pHead
,
pReq
,
sizeof
(
SRpcMsg
));
memcpy
(
pHead
+
sizeof
(
SRpcMsg
),
pEpSet
,
sizeof
(
SEpSet
));
taosProcPutToParentQ
(
pWrapper
->
pProc
,
pReq
,
headLen
,
pReq
->
pCont
,
pReq
->
contLen
,
PROC_REQ
);
taosProcPutToParentQ
(
pWrapper
->
pProc
,
pHead
,
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
),
pReq
->
pCont
,
pReq
->
contLen
,
PROC_REQ
);
taosMemoryFree
(
pHead
);
return
0
;
}
...
...
@@ -404,3 +397,67 @@ SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
};
return
msgCb
;
}
static
void
dndConsumeChildQueue
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
,
int16_t
msgLen
,
void
*
pCont
,
int32_t
contLen
,
ProcFuncType
ftype
)
{
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
pRpc
->
pCont
=
pCont
;
dTrace
(
"msg:%p, get from child queue, handle:%p app:%p"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
);
NodeMsgFp
msgFp
=
pWrapper
->
msgFps
[
TMSG_INDEX
(
pRpc
->
msgType
)];
int32_t
code
=
(
*
msgFp
)(
pWrapper
,
pMsg
);
if
(
code
!=
0
)
{
dError
(
"msg:%p, failed to process since code:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
if
(
pRpc
->
msgType
&
1U
)
{
SRpcMsg
rsp
=
{.
handle
=
pRpc
->
handle
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
terrno
};
dndSendRsp
(
pWrapper
,
&
rsp
);
}
dTrace
(
"msg:%p, is freed"
,
pMsg
);
taosFreeQitem
(
pMsg
);
rpcFreeCont
(
pCont
);
}
}
static
void
dndConsumeParentQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
,
int16_t
msgLen
,
void
*
pCont
,
int32_t
contLen
,
ProcFuncType
ftype
)
{
pMsg
->
pCont
=
pCont
;
dTrace
(
"msg:%p, get from parent queue, ftype:%d handle:%p, app:%p"
,
pMsg
,
ftype
,
pMsg
->
handle
,
pMsg
->
ahandle
);
switch
(
ftype
)
{
case
PROC_REGIST
:
rpcRegisterBrokenLinkArg
(
pMsg
);
break
;
case
PROC_RELEASE
:
rpcReleaseHandle
(
pMsg
->
handle
,
(
int8_t
)
pMsg
->
code
);
rpcFreeCont
(
pCont
);
break
;
case
PROC_REQ
:
dndSendRpcReq
(
&
pWrapper
->
pDnode
->
trans
,
(
SEpSet
*
)((
char
*
)
pMsg
+
sizeof
(
SRpcMsg
)),
pMsg
);
break
;
case
PROC_RSP
:
dndSendRpcRsp
(
pWrapper
,
pMsg
);
break
;
default:
break
;
}
taosMemoryFree
(
pMsg
);
}
SProcCfg
dndGenProcCfg
(
SMgmtWrapper
*
pWrapper
)
{
SProcCfg
cfg
=
{.
childConsumeFp
=
(
ProcConsumeFp
)
dndConsumeChildQueue
,
.
childMallocHeadFp
=
(
ProcMallocFp
)
taosAllocateQitem
,
.
childFreeHeadFp
=
(
ProcFreeFp
)
taosFreeQitem
,
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentConsumeFp
=
(
ProcConsumeFp
)
dndConsumeParentQueue
,
.
parentMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
shm
,
.
pParent
=
pWrapper
,
.
name
=
pWrapper
->
name
};
return
cfg
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录