Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2ecdba11
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
2ecdba11
编写于
7月 11, 2022
作者:
dengyihao
提交者:
GitHub
7月 11, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14783 from taosdata/enh/rpcSvrRefactor
enh: avoid rpc mem leak
上级
08259f16
f65d33ea
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
30 addition
and
6 deletion
+30
-6
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+6
-0
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+24
-6
未找到文件。
source/libs/transport/inc/transComm.h
浏览文件 @
2ecdba11
...
...
@@ -311,6 +311,12 @@ void transCtxMerge(STransCtx* dst, STransCtx* src);
void
*
transCtxDumpVal
(
STransCtx
*
ctx
,
int32_t
key
);
void
*
transCtxDumpBrokenlinkVal
(
STransCtx
*
ctx
,
int32_t
*
msgType
);
// request list
typedef
struct
STransReq
{
queue
q
;
void
*
data
;
}
STransReq
;
// queue sending msgs
typedef
struct
{
SArray
*
q
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
2ecdba11
...
...
@@ -29,7 +29,7 @@ typedef struct {
typedef
struct
SSvrConn
{
T_REF_DECLARE
()
uv_tcp_t
*
pTcp
;
uv_write_t
pWriter
;
queue
wreqQueue
;
uv_timer_t
pTimer
;
queue
queue
;
...
...
@@ -331,8 +331,14 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
}
void
uvOnSendCb
(
uv_write_t
*
req
,
int
status
)
{
SSvrConn
*
conn
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
taosMemoryFree
(
req
);
STransReq
*
wreq
=
req
&&
req
->
data
?
req
->
data
:
NULL
;
SSvrConn
*
conn
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
if
(
wreq
!=
NULL
&&
conn
!=
NULL
)
{
QUEUE_REMOVE
(
&
wreq
->
q
);
taosMemoryFree
(
wreq
->
data
);
taosMemoryFree
(
wreq
);
}
if
(
conn
==
NULL
)
return
;
if
(
status
==
0
)
{
...
...
@@ -437,12 +443,16 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
transRefSrvHandle
(
pConn
);
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
));
STransReq
*
wreq
=
taosMemoryCalloc
(
1
,
sizeof
(
STransReq
));
wreq
->
data
=
req
;
req
->
data
=
wreq
;
QUEUE_PUSH
(
&
pConn
->
wreqQueue
,
&
wreq
->
q
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
}
static
void
uvStartSendResp
(
SSvrMsg
*
smsg
)
{
// impl
SSvrConn
*
pConn
=
smsg
->
pConn
;
if
(
pConn
->
broken
==
true
)
{
// persist by
transFreeMsg
(
smsg
->
msg
.
pCont
);
...
...
@@ -639,8 +649,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_tcp_init
(
pThrd
->
loop
,
pConn
->
pTcp
);
pConn
->
pTcp
->
data
=
pConn
;
pConn
->
pWriter
.
data
=
pConn
;
transSetConnOption
((
uv_tcp_t
*
)
pConn
->
pTcp
);
if
(
uv_accept
(
q
,
(
uv_stream_t
*
)(
pConn
->
pTcp
))
==
0
)
{
...
...
@@ -748,6 +756,8 @@ static SSvrConn* createConn(void* hThrd) {
SWorkThrd
*
pThrd
=
hThrd
;
SSvrConn
*
pConn
=
(
SSvrConn
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSvrConn
));
QUEUE_INIT
(
&
pConn
->
wreqQueue
);
QUEUE_INIT
(
&
pConn
->
queue
);
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
...
...
@@ -823,6 +833,14 @@ static void uvDestroyConn(uv_handle_t* handle) {
SSvrMsg
*
msg
=
transQueueGet
(
&
conn
->
srvMsgs
,
i
);
destroySmsg
(
msg
);
}
while
(
!
QUEUE_IS_EMPTY
(
&
conn
->
wreqQueue
))
{
queue
*
h
=
QUEUE_HEAD
(
&
conn
->
wreqQueue
);
QUEUE_REMOVE
(
h
);
STransReq
*
req
=
QUEUE_DATA
(
h
,
STransReq
,
q
);
taosMemoryFree
(
req
->
data
);
taosMemoryFree
(
req
);
}
transQueueDestroy
(
&
conn
->
srvMsgs
);
QUEUE_REMOVE
(
&
conn
->
queue
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录