Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7614974d
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7614974d
编写于
7月 12, 2022
作者:
dengyihao
提交者:
GitHub
7月 12, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14790 from taosdata/enh/rpcRefactor2
enh : refactor rpc
上级
7069380a
7142f133
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
59 addition
and
30 deletion
+59
-30
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+5
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+8
-8
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+42
-0
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+4
-22
未找到文件。
source/libs/transport/inc/transComm.h
浏览文件 @
7614974d
...
@@ -317,6 +317,11 @@ typedef struct STransReq {
...
@@ -317,6 +317,11 @@ typedef struct STransReq {
void
*
data
;
void
*
data
;
}
STransReq
;
}
STransReq
;
void
transReqQueueInit
(
queue
*
q
);
void
*
transReqQueuePushReq
(
queue
*
q
);
void
*
transReqQueueRemove
(
void
*
arg
);
void
transReqQueueClear
(
queue
*
q
);
// queue sending msgs
// queue sending msgs
typedef
struct
{
typedef
struct
{
SArray
*
q
;
SArray
*
q
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
7614974d
...
@@ -19,7 +19,7 @@ typedef struct SCliConn {
...
@@ -19,7 +19,7 @@ typedef struct SCliConn {
T_REF_DECLARE
()
T_REF_DECLARE
()
uv_connect_t
connReq
;
uv_connect_t
connReq
;
uv_stream_t
*
stream
;
uv_stream_t
*
stream
;
uv_write_t
writeReq
;
queue
wreqQueue
;
void
*
hostThrd
;
void
*
hostThrd
;
...
@@ -586,9 +586,10 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
...
@@ -586,9 +586,10 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
conn
->
stream
->
data
=
conn
;
conn
->
stream
->
data
=
conn
;
conn
->
writeReq
.
data
=
conn
;
conn
->
connReq
.
data
=
conn
;
conn
->
connReq
.
data
=
conn
;
transReqQueueInit
(
&
conn
->
wreqQueue
);
transQueueInit
(
&
conn
->
cliMsgs
,
NULL
);
transQueueInit
(
&
conn
->
cliMsgs
,
NULL
);
QUEUE_INIT
(
&
conn
->
conn
);
QUEUE_INIT
(
&
conn
->
conn
);
conn
->
hostThrd
=
pThrd
;
conn
->
hostThrd
=
pThrd
;
...
@@ -627,6 +628,8 @@ static void cliDestroy(uv_handle_t* handle) {
...
@@ -627,6 +628,8 @@ static void cliDestroy(uv_handle_t* handle) {
transCtxCleanup
(
&
conn
->
ctx
);
transCtxCleanup
(
&
conn
->
ctx
);
transQueueDestroy
(
&
conn
->
cliMsgs
);
transQueueDestroy
(
&
conn
->
cliMsgs
);
tTrace
(
"%s conn %p destroy successfully"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
tTrace
(
"%s conn %p destroy successfully"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
transReqQueueClear
(
&
conn
->
wreqQueue
);
transDestroyBuffer
(
&
conn
->
readBuf
);
transDestroyBuffer
(
&
conn
->
readBuf
);
taosMemoryFree
(
conn
);
taosMemoryFree
(
conn
);
}
}
...
@@ -649,11 +652,8 @@ static bool cliHandleNoResp(SCliConn* conn) {
...
@@ -649,11 +652,8 @@ static bool cliHandleNoResp(SCliConn* conn) {
return
res
;
return
res
;
}
}
static
void
cliSendCb
(
uv_write_t
*
req
,
int
status
)
{
static
void
cliSendCb
(
uv_write_t
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
SCliConn
*
pConn
=
transReqQueueRemove
(
req
);
taosMemoryFree
(
req
);
if
(
pConn
==
NULL
)
return
;
if
(
pConn
==
NULL
)
{
return
;
}
if
(
status
==
0
)
{
if
(
status
==
0
)
{
tTrace
(
"%s conn %p data already was written out"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
tTrace
(
"%s conn %p data already was written out"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
...
@@ -711,7 +711,7 @@ void cliSend(SCliConn* pConn) {
...
@@ -711,7 +711,7 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP
(
pConn
);
CONN_SET_PERSIST_BY_APP
(
pConn
);
}
}
uv_write_t
*
req
=
t
aosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
)
);
uv_write_t
*
req
=
t
ransReqQueuePushReq
(
&
pConn
->
wreqQueue
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
cliSendCb
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
cliSendCb
);
return
;
return
;
_RETURN:
_RETURN:
...
...
source/libs/transport/src/transComm.c
浏览文件 @
7614974d
...
@@ -293,6 +293,48 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
...
@@ -293,6 +293,48 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
return
ret
;
return
ret
;
}
}
void
transReqQueueInit
(
queue
*
q
)
{
// init req queue
QUEUE_INIT
(
q
);
}
void
*
transReqQueuePushReq
(
queue
*
q
)
{
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
));
STransReq
*
wreq
=
taosMemoryCalloc
(
1
,
sizeof
(
STransReq
));
wreq
->
data
=
req
;
req
->
data
=
wreq
;
QUEUE_PUSH
(
q
,
&
wreq
->
q
);
return
req
;
}
void
*
transReqQueueRemove
(
void
*
arg
)
{
void
*
ret
=
NULL
;
uv_write_t
*
req
=
arg
;
STransReq
*
wreq
=
req
&&
req
->
data
?
req
->
data
:
NULL
;
assert
(
wreq
->
data
==
req
);
if
(
wreq
==
NULL
||
wreq
->
data
==
NULL
)
{
taosMemoryFree
(
wreq
->
data
);
taosMemoryFree
(
wreq
);
return
req
;
}
QUEUE_REMOVE
(
&
wreq
->
q
);
ret
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
taosMemoryFree
(
wreq
->
data
);
taosMemoryFree
(
wreq
);
return
ret
;
}
void
transReqQueueClear
(
queue
*
q
)
{
while
(
!
QUEUE_IS_EMPTY
(
q
))
{
queue
*
h
=
QUEUE_HEAD
(
q
);
QUEUE_REMOVE
(
h
);
STransReq
*
req
=
QUEUE_DATA
(
h
,
STransReq
,
q
);
taosMemoryFree
(
req
->
data
);
taosMemoryFree
(
req
);
}
}
void
transQueueInit
(
STransQueue
*
queue
,
void
(
*
freeFunc
)(
const
void
*
arg
))
{
void
transQueueInit
(
STransQueue
*
queue
,
void
(
*
freeFunc
)(
const
void
*
arg
))
{
queue
->
q
=
taosArrayInit
(
2
,
sizeof
(
void
*
));
queue
->
q
=
taosArrayInit
(
2
,
sizeof
(
void
*
));
queue
->
freeFunc
=
(
void
(
*
)(
const
void
*
))
freeFunc
;
queue
->
freeFunc
=
(
void
(
*
)(
const
void
*
))
freeFunc
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
7614974d
...
@@ -331,14 +331,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
...
@@ -331,14 +331,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
}
}
void
uvOnSendCb
(
uv_write_t
*
req
,
int
status
)
{
void
uvOnSendCb
(
uv_write_t
*
req
,
int
status
)
{
STransReq
*
wreq
=
req
&&
req
->
data
?
req
->
data
:
NULL
;
SSvrConn
*
conn
=
transReqQueueRemove
(
req
);
SSvrConn
*
conn
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
if
(
wreq
!=
NULL
&&
conn
!=
NULL
)
{
QUEUE_REMOVE
(
&
wreq
->
q
);
taosMemoryFree
(
wreq
->
data
);
taosMemoryFree
(
wreq
);
}
if
(
conn
==
NULL
)
return
;
if
(
conn
==
NULL
)
return
;
if
(
status
==
0
)
{
if
(
status
==
0
)
{
...
@@ -442,12 +435,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
...
@@ -442,12 +435,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
transRefSrvHandle
(
pConn
);
transRefSrvHandle
(
pConn
);
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
));
uv_write_t
*
req
=
transReqQueuePushReq
(
&
pConn
->
wreqQueue
);
STransReq
*
wreq
=
taosMemoryCalloc
(
1
,
sizeof
(
STransReq
));
wreq
->
data
=
req
;
req
->
data
=
wreq
;
QUEUE_PUSH
(
&
pConn
->
wreqQueue
,
&
wreq
->
q
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
}
}
static
void
uvStartSendResp
(
SSvrMsg
*
smsg
)
{
static
void
uvStartSendResp
(
SSvrMsg
*
smsg
)
{
...
@@ -757,7 +745,7 @@ static SSvrConn* createConn(void* hThrd) {
...
@@ -757,7 +745,7 @@ static SSvrConn* createConn(void* hThrd) {
SSvrConn
*
pConn
=
(
SSvrConn
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSvrConn
));
SSvrConn
*
pConn
=
(
SSvrConn
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSvrConn
));
QUEUE_INIT
(
&
pConn
->
wreqQueue
);
transReqQueueInit
(
&
pConn
->
wreqQueue
);
QUEUE_INIT
(
&
pConn
->
queue
);
QUEUE_INIT
(
&
pConn
->
queue
);
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
...
@@ -834,13 +822,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
...
@@ -834,13 +822,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
destroySmsg
(
msg
);
destroySmsg
(
msg
);
}
}
while
(
!
QUEUE_IS_EMPTY
(
&
conn
->
wreqQueue
))
{
transReqQueueClear
(
&
conn
->
wreqQueue
);
queue
*
h
=
QUEUE_HEAD
(
&
conn
->
wreqQueue
);
QUEUE_REMOVE
(
h
);
STransReq
*
req
=
QUEUE_DATA
(
h
,
STransReq
,
q
);
taosMemoryFree
(
req
->
data
);
taosMemoryFree
(
req
);
}
transQueueDestroy
(
&
conn
->
srvMsgs
);
transQueueDestroy
(
&
conn
->
srvMsgs
);
QUEUE_REMOVE
(
&
conn
->
queue
);
QUEUE_REMOVE
(
&
conn
->
queue
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录