Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eb3a9a53
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
eb3a9a53
编写于
7月 05, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix rpc mem leak
上级
2968ac85
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
49 addition
and
27 deletion
+49
-27
include/common/tmsg.h
include/common/tmsg.h
+6
-2
include/common/tmsgdef.h
include/common/tmsgdef.h
+7
-0
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+1
-0
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+8
-8
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+27
-17
未找到文件。
include/common/tmsg.h
浏览文件 @
eb3a9a53
...
...
@@ -55,8 +55,12 @@ extern int32_t tMsgDict[];
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) \
(((TYPE) >= 0 && (TYPE) < TDMT_MAX) ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] : 0)
#define TMSG_INFO(TYPE) \
((TYPE) >= 0 && \
((TYPE) < TDMT_DND_MAX_MSG | (TYPE) < TDMT_MND_MAX_MSG | (TYPE) < TDMT_VND_MAX_MSG | (TYPE) < TDMT_SCH_MAX_MSG | \
(TYPE) < TDMT_STREAM_MAX_MSG | (TYPE) < TDMT_MON_MAX_MSG | (TYPE) < TDMT_SYNC_MAX_MSG)) \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
typedef
uint16_t
tmsg_t
;
...
...
include/common/tmsgdef.h
浏览文件 @
eb3a9a53
...
...
@@ -82,6 +82,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_DND_NET_TEST
,
"net-test"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CONFIG_DNODE
,
"config-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_SYSTABLE_RETRIEVE
,
"dnode-retrieve"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_MAX_MSG
,
"dnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_MND_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CONNECT
,
"connect"
,
NULL
,
NULL
)
...
...
@@ -164,6 +165,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_SPLIT_VGROUP
,
"split-vgroup"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_SHOW_VARIABLES
,
"show-variables"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_SERVER_VERSION
,
"server-version"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MAX_MSG
,
"mnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_VND_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBMIT
,
"submit"
,
SSubmitReq
,
SSubmitRsp
)
...
...
@@ -198,6 +200,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_HASHRANGE
,
"alter-hashrange"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMPACT
,
"compact"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_TTL_TABLE
,
"drop-ttl-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MAX_MSG
,
"vnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_SCH_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_QUERY
,
"query"
,
NULL
,
NULL
)
...
...
@@ -209,6 +212,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_SCH_DROP_TASK
,
"drop-task"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_EXPLAIN
,
"explain"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_LINK_BROKEN
,
"link-broken"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_MAX_MSG
,
"sch-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_STREAM_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DEPLOY
,
"stream-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
...
...
@@ -217,6 +221,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DISPATCH
,
"stream-task-dispatch"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_RECOVER
,
"stream-task-recover"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_RETRIEVE
,
"stream-retrieve"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_MAX_MSG
,
"stream-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_MON_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MM_INFO
,
"monitor-minfo"
,
NULL
,
NULL
)
...
...
@@ -227,6 +232,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MON_VM_LOAD
,
"monitor-vload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MM_LOAD
,
"monitor-mload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_QM_LOAD
,
"monitor-qload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MAX_MSG
,
"monitor-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_SYNC_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_TIMEOUT
,
"sync-timer"
,
NULL
,
NULL
)
...
...
@@ -251,6 +257,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_SYNC_LEADER_TRANSFER
,
"sync-leader-transfer"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_SET_MNODE_STANDBY
,
"set-mnode-standby"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_SET_VNODE_STANDBY
,
"set-vnode-standby"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_MAX_MSG
,
"sync-max"
,
NULL
,
NULL
)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
...
...
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
浏览文件 @
eb3a9a53
...
...
@@ -212,6 +212,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupClient
(
pDnode
);
dmCleanupServer
(
pDnode
);
dmClearVars
(
pDnode
);
rpcCleanup
();
dDebug
(
"dnode is closed, ptr:%p"
,
pDnode
);
}
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
eb3a9a53
...
...
@@ -96,8 +96,8 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL
15
// ms retry interval
#define TRANS_CONN_TIMEOUT
3
// connect timeout
#define TRANS_RETRY_INTERVAL
15
// ms retry interval
#define TRANS_CONN_TIMEOUT
3
// connect timeout
typedef
SRpcMsg
STransMsg
;
typedef
SRpcCtx
STransCtx
;
...
...
@@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType;
typedef
enum
{
ConnNormal
,
ConnAcquire
,
ConnRelease
,
ConnBroken
,
ConnInPool
}
ConnStatus
;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_RESERVE_SIZE
(sizeof(STranConnCtx))
#define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define TRANS_MSG_OVERHEAD
(sizeof(STransMsgHead))
#define transHeadFromCont(cont)
((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg)
(msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
#define transContLenFromMsg(msgLen)
(msgLen - sizeof(STransMsgHead));
#define transIsReq(type)
(type & 1U)
#define transLabel(trans) ((STrans*)trans)->label
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
eb3a9a53
...
...
@@ -241,18 +241,19 @@ static void uvHandleReq(SSvrConn* pConn) {
tDebug
(
"conn %p acquired by server app"
,
pConn
);
}
}
STrans
*
pTransInst
=
pConn
->
pTransInst
;
STraceId
*
trace
=
&
pHead
->
traceId
;
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
tGTrace
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
transLabel
(
p
Conn
),
pConn
,
tGTrace
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
transLabel
(
p
TransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
);
}
else
{
tGTrace
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d"
,
transLabel
(
pConn
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
pHead
->
noResp
,
transMsg
.
code
);
tGTrace
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
)
,
transMsg
.
co
ntLen
,
pHead
->
noResp
,
transMsg
.
co
de
);
// no ref here
}
...
...
@@ -265,8 +266,8 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg
.
info
.
refId
=
pConn
->
refId
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
tGTrace
(
"%s handle %p conn: %p translated to app, refId: %"
PRIu64
""
,
transLabel
(
p
Conn
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
tGTrace
(
"%s handle %p conn: %p translated to app, refId: %"
PRIu64
""
,
transLabel
(
p
TransInst
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
assert
(
transMsg
.
info
.
handle
!=
NULL
);
if
(
pHead
->
noResp
==
1
)
{
...
...
@@ -281,7 +282,6 @@ static void uvHandleReq(SSvrConn* pConn) {
transReleaseExHandle
(
transGetRefMgt
(),
pConn
->
refId
);
STrans
*
pTransInst
=
pConn
->
pTransInst
;
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
transMsg
,
NULL
);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
}
...
...
@@ -290,14 +290,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SSvrConn
*
conn
=
cli
->
data
;
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
STrans
*
pTransInst
=
conn
->
pTransInst
;
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
tTrace
(
"%s conn %p total read: %d, current read: %d"
,
transLabel
(
conn
->
pTransInst
),
conn
,
pBuf
->
len
,
(
int
)
nread
);
tTrace
(
"%s conn %p total read: %d, current read: %d"
,
transLabel
(
pTransInst
),
conn
,
pBuf
->
len
,
(
int
)
nread
);
if
(
transReadComplete
(
pBuf
))
{
tTrace
(
"%s conn %p alread read complete packet"
,
transLabel
(
conn
->
pTransInst
),
conn
);
tTrace
(
"%s conn %p alread read complete packet"
,
transLabel
(
pTransInst
),
conn
);
uvHandleReq
(
conn
);
}
else
{
tTrace
(
"%s conn %p read partial packet, continue to read"
,
transLabel
(
conn
->
pTransInst
),
conn
);
tTrace
(
"%s conn %p read partial packet, continue to read"
,
transLabel
(
pTransInst
),
conn
);
}
return
;
}
...
...
@@ -305,12 +306,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return
;
}
tError
(
"%s conn %p read error: %s"
,
transLabel
(
conn
->
pTransInst
),
conn
,
uv_err_name
(
nread
));
tError
(
"%s conn %p read error: %s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
nread
));
if
(
nread
<
0
)
{
conn
->
broken
=
true
;
if
(
conn
->
status
==
ConnAcquire
)
{
if
(
conn
->
regArg
.
init
)
{
tTrace
(
"%s conn %p broken, notify server app"
,
transLabel
(
conn
->
pTransInst
),
conn
);
tTrace
(
"%s conn %p broken, notify server app"
,
transLabel
(
pTransInst
),
conn
);
STrans
*
pTransInst
=
conn
->
pTransInst
;
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
(
conn
->
regArg
.
msg
),
NULL
);
memset
(
&
conn
->
regArg
,
0
,
sizeof
(
conn
->
regArg
));
...
...
@@ -414,8 +415,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
char
*
msg
=
(
char
*
)
pHead
;
int32_t
len
=
transMsgLenFromCont
(
pMsg
->
contLen
);
STrans
*
pTransInst
=
pConn
->
pTransInst
;
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
tGTrace
(
"%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d"
,
transLabel
(
p
Conn
->
p
TransInst
),
pConn
,
tGTrace
(
"%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
len
);
pHead
->
msgLen
=
htonl
(
len
);
...
...
@@ -761,9 +763,10 @@ static SSvrConn* createConn(void* hThrd) {
exh
->
refId
=
transAddExHandle
(
transGetRefMgt
(),
exh
);
transAcquireExHandle
(
transGetRefMgt
(),
exh
->
refId
);
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
pConn
->
refId
=
exh
->
refId
;
transRefSrvHandle
(
pConn
);
tTrace
(
"%s handle %p, conn %p created, refId: %"
PRId64
""
,
transLabel
(
pT
hrd
->
pT
ransInst
),
exh
,
pConn
,
pConn
->
refId
);
tTrace
(
"%s handle %p, conn %p created, refId: %"
PRId64
""
,
transLabel
(
pTransInst
),
exh
,
pConn
,
pConn
->
refId
);
return
pConn
;
}
...
...
@@ -812,7 +815,13 @@ static void uvDestroyConn(uv_handle_t* handle) {
transReleaseExHandle
(
transGetRefMgt
(),
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
tDebug
(
"%s conn %p destroy"
,
transLabel
(
thrd
->
pTransInst
),
conn
);
STrans
*
pTransInst
=
thrd
->
pTransInst
;
tDebug
(
"%s conn %p destroy"
,
transLabel
(
pTransInst
),
conn
);
for
(
int
i
=
0
;
i
<
transQueueSize
(
&
conn
->
srvMsgs
);
i
++
)
{
SSvrMsg
*
msg
=
transQueueGet
(
&
conn
->
srvMsgs
,
i
);
destroySmsg
(
msg
);
}
transQueueDestroy
(
&
conn
->
srvMsgs
);
QUEUE_REMOVE
(
&
conn
->
queue
);
...
...
@@ -1103,7 +1112,8 @@ void transRegisterMsg(const STransMsg* msg) {
m
->
msg
=
tmsg
;
m
->
type
=
Register
;
tTrace
(
"%s conn %p start to register brokenlink callback"
,
transLabel
(
pThrd
->
pTransInst
),
exh
->
handle
);
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
tTrace
(
"%s conn %p start to register brokenlink callback"
,
transLabel
(
pTransInst
),
exh
->
handle
);
transAsyncSend
(
pThrd
->
asyncPool
,
&
m
->
q
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
return
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录