Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eb5dbffc
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
eb5dbffc
编写于
7月 22, 2022
作者:
dengyihao
提交者:
GitHub
7月 22, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15257 from taosdata/fix/fixRpcCode
fix: refactor rpc code
上级
43d20d5e
46d4bf90
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
71 addition
and
47 deletion
+71
-47
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+6
-6
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+32
-25
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+30
-13
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+3
-3
未找到文件。
source/libs/transport/inc/transComm.h
浏览文件 @
eb5dbffc
...
@@ -229,8 +229,8 @@ typedef struct {
...
@@ -229,8 +229,8 @@ typedef struct {
int8_t
stop
;
int8_t
stop
;
}
SAsyncPool
;
}
SAsyncPool
;
SAsyncPool
*
trans
CreateAsyncPool
(
uv_loop_t
*
loop
,
int
sz
,
void
*
arg
,
AsyncCB
cb
);
SAsyncPool
*
trans
AsyncPoolCreate
(
uv_loop_t
*
loop
,
int
sz
,
void
*
arg
,
AsyncCB
cb
);
void
trans
DestroyAsyncPool
(
SAsyncPool
*
pool
);
void
trans
AsyncPoolDestroy
(
SAsyncPool
*
pool
);
int
transAsyncSend
(
SAsyncPool
*
pool
,
queue
*
mq
);
int
transAsyncSend
(
SAsyncPool
*
pool
,
queue
*
mq
);
bool
transAsyncPoolIsEmpty
(
SAsyncPool
*
pool
);
bool
transAsyncPoolIsEmpty
(
SAsyncPool
*
pool
);
...
@@ -322,7 +322,7 @@ typedef struct STransReq {
...
@@ -322,7 +322,7 @@ typedef struct STransReq {
}
STransReq
;
}
STransReq
;
void
transReqQueueInit
(
queue
*
q
);
void
transReqQueueInit
(
queue
*
q
);
void
*
transReqQueuePush
Req
(
queue
*
q
);
void
*
transReqQueuePush
(
queue
*
q
);
void
*
transReqQueueRemove
(
void
*
arg
);
void
*
transReqQueueRemove
(
void
*
arg
);
void
transReqQueueClear
(
queue
*
q
);
void
transReqQueueClear
(
queue
*
q
);
...
@@ -393,9 +393,9 @@ typedef struct SDelayQueue {
...
@@ -393,9 +393,9 @@ typedef struct SDelayQueue {
uv_loop_t
*
loop
;
uv_loop_t
*
loop
;
}
SDelayQueue
;
}
SDelayQueue
;
int
transDQCreate
(
uv_loop_t
*
loop
,
SDelayQueue
**
queue
);
int
transDQCreate
(
uv_loop_t
*
loop
,
SDelayQueue
**
queue
);
void
transDQDestroy
(
SDelayQueue
*
queue
,
void
(
*
freeFunc
)(
void
*
arg
));
void
transDQDestroy
(
SDelayQueue
*
queue
,
void
(
*
freeFunc
)(
void
*
arg
));
int
transDQSched
(
SDelayQueue
*
queue
,
void
(
*
func
)(
void
*
arg
),
void
*
arg
,
uint64_t
timeoutMs
);
SDelayTask
*
transDQSched
(
SDelayQueue
*
queue
,
void
(
*
func
)(
void
*
arg
),
void
*
arg
,
uint64_t
timeoutMs
);
bool
transEpSetIsEqual
(
SEpSet
*
a
,
SEpSet
*
b
);
bool
transEpSetIsEqual
(
SEpSet
*
a
,
SEpSet
*
b
);
/*
/*
...
...
source/libs/transport/src/transCli.c
浏览文件 @
eb5dbffc
...
@@ -26,7 +26,7 @@ typedef struct SCliConn {
...
@@ -26,7 +26,7 @@ typedef struct SCliConn {
SConnBuffer
readBuf
;
SConnBuffer
readBuf
;
STransQueue
cliMsgs
;
STransQueue
cliMsgs
;
queue
conn
;
queue
q
;
uint64_t
expireTime
;
uint64_t
expireTime
;
STransCtx
ctx
;
STransCtx
ctx
;
...
@@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) {
...
@@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) {
while
(
p
!=
NULL
)
{
while
(
p
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
p
->
conn
))
{
while
(
!
QUEUE_IS_EMPTY
(
&
p
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
p
->
conn
);
queue
*
h
=
QUEUE_HEAD
(
&
p
->
conn
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
q
);
if
(
c
->
expireTime
<
currentTime
)
{
if
(
c
->
expireTime
<
currentTime
)
{
QUEUE_REMOVE
(
h
);
QUEUE_REMOVE
(
h
);
transUnrefCliHandle
(
c
);
transUnrefCliHandle
(
c
);
...
@@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) {
...
@@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) {
while
(
connList
!=
NULL
)
{
while
(
connList
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
connList
->
conn
))
{
while
(
!
QUEUE_IS_EMPTY
(
&
connList
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
connList
->
conn
);
queue
*
h
=
QUEUE_HEAD
(
&
connList
->
conn
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
q
);
cliDestroyConn
(
c
,
true
);
cliDestroyConn
(
c
,
true
);
}
}
connList
=
taosHashIterate
((
SHashObj
*
)
pool
,
connList
);
connList
=
taosHashIterate
((
SHashObj
*
)
pool
,
connList
);
...
@@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
...
@@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
return
NULL
;
return
NULL
;
}
}
queue
*
h
=
QUEUE_HEAD
(
&
plist
->
conn
);
queue
*
h
=
QUEUE_HEAD
(
&
plist
->
conn
);
SCliConn
*
conn
=
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
SCliConn
*
conn
=
QUEUE_DATA
(
h
,
SCliConn
,
q
);
conn
->
status
=
ConnNormal
;
conn
->
status
=
ConnNormal
;
QUEUE_REMOVE
(
&
conn
->
conn
);
QUEUE_REMOVE
(
&
conn
->
q
);
QUEUE_INIT
(
&
conn
->
conn
);
QUEUE_INIT
(
&
conn
->
q
);
assert
(
h
==
&
conn
->
conn
);
assert
(
h
==
&
conn
->
q
);
return
conn
;
return
conn
;
}
}
static
int32_t
allocConnRef
(
SCliConn
*
conn
,
bool
update
)
{
static
int32_t
allocConnRef
(
SCliConn
*
conn
,
bool
update
)
{
...
@@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
...
@@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
// list already create before
// list already create before
assert
(
plist
!=
NULL
);
assert
(
plist
!=
NULL
);
QUEUE_INIT
(
&
conn
->
conn
);
QUEUE_INIT
(
&
conn
->
q
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
conn
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
q
);
assert
(
!
QUEUE_IS_EMPTY
(
&
plist
->
conn
));
assert
(
!
QUEUE_IS_EMPTY
(
&
plist
->
conn
));
}
}
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
...
@@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
...
@@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
transReqQueueInit
(
&
conn
->
wreqQueue
);
transReqQueueInit
(
&
conn
->
wreqQueue
);
transQueueInit
(
&
conn
->
cliMsgs
,
NULL
);
transQueueInit
(
&
conn
->
cliMsgs
,
NULL
);
QUEUE_INIT
(
&
conn
->
conn
);
QUEUE_INIT
(
&
conn
->
q
);
conn
->
hostThrd
=
pThrd
;
conn
->
hostThrd
=
pThrd
;
conn
->
status
=
ConnNormal
;
conn
->
status
=
ConnNormal
;
conn
->
broken
=
0
;
conn
->
broken
=
0
;
...
@@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
...
@@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
}
}
static
void
cliDestroyConn
(
SCliConn
*
conn
,
bool
clear
)
{
static
void
cliDestroyConn
(
SCliConn
*
conn
,
bool
clear
)
{
tTrace
(
"%s conn %p remove from conn pool"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
tTrace
(
"%s conn %p remove from conn pool"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
QUEUE_REMOVE
(
&
conn
->
conn
);
QUEUE_REMOVE
(
&
conn
->
q
);
QUEUE_INIT
(
&
conn
->
conn
);
QUEUE_INIT
(
&
conn
->
q
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
conn
->
refId
=
-
1
;
conn
->
refId
=
-
1
;
...
@@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) {
...
@@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP
(
pConn
);
CONN_SET_PERSIST_BY_APP
(
pConn
);
}
}
uv_write_t
*
req
=
transReqQueuePush
Req
(
&
pConn
->
wreqQueue
);
uv_write_t
*
req
=
transReqQueuePush
(
&
pConn
->
wreqQueue
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
cliSendCb
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
cliSendCb
);
return
;
return
;
_RETURN:
_RETURN:
...
@@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() {
...
@@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() {
pThrd
->
loop
=
(
uv_loop_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
pThrd
->
loop
=
(
uv_loop_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pThrd
->
loop
);
uv_loop_init
(
pThrd
->
loop
);
pThrd
->
asyncPool
=
trans
CreateAsyncPool
(
pThrd
->
loop
,
5
,
pThrd
,
cliAsyncCb
);
pThrd
->
asyncPool
=
trans
AsyncPoolCreate
(
pThrd
->
loop
,
5
,
pThrd
,
cliAsyncCb
);
uv_timer_init
(
pThrd
->
loop
,
&
pThrd
->
timer
);
uv_timer_init
(
pThrd
->
loop
,
&
pThrd
->
timer
);
pThrd
->
timer
.
data
=
pThrd
;
pThrd
->
timer
.
data
=
pThrd
;
...
@@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
...
@@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
CLI_RELEASE_UV
(
pThrd
->
loop
);
CLI_RELEASE_UV
(
pThrd
->
loop
);
taosThreadMutexDestroy
(
&
pThrd
->
msgMtx
);
taosThreadMutexDestroy
(
&
pThrd
->
msgMtx
);
TRANS_DESTROY_ASYNC_POOL_MSG
(
pThrd
->
asyncPool
,
SCliMsg
,
destroyCmsg
);
TRANS_DESTROY_ASYNC_POOL_MSG
(
pThrd
->
asyncPool
,
SCliMsg
,
destroyCmsg
);
trans
DestroyAsyncPool
(
pThrd
->
asyncPool
);
trans
AsyncPoolDestroy
(
pThrd
->
asyncPool
);
transDQDestroy
(
pThrd
->
delayQueue
,
destroyCmsg
);
transDQDestroy
(
pThrd
->
delayQueue
,
destroyCmsg
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
->
loop
);
...
@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) {
...
@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) {
cliHandleReq
(
pMsg
,
pThrd
);
cliHandleReq
(
pMsg
,
pThrd
);
}
}
static
void
doCloseIdleConn
(
void
*
param
)
{
STaskArg
*
arg
=
param
;
SCliConn
*
conn
=
arg
->
param1
;
SCliThrd
*
pThrd
=
arg
->
param2
;
}
static
void
cliSchedMsgToNextNode
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
static
void
cliSchedMsgToNextNode
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
...
@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
...
@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
}
}
}
}
bool
cliTry
To
ExtractEpSet
(
STransMsg
*
pResp
,
SEpSet
*
dst
)
{
bool
cliTryExtractEpSet
(
STransMsg
*
pResp
,
SEpSet
*
dst
)
{
if
((
pResp
==
NULL
||
pResp
->
info
.
hasEpSet
==
0
))
{
if
((
pResp
==
NULL
||
pResp
->
info
.
hasEpSet
==
0
))
{
return
false
;
return
false
;
}
}
...
@@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
...
@@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
*/
*/
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
int32_t
code
=
pResp
->
code
;
int32_t
code
=
pResp
->
code
;
bool
retry
=
(
pTransInst
->
retry
!=
NULL
&&
pTransInst
->
retry
(
code
,
pResp
->
msgType
-
1
))
?
true
:
false
;
bool
retry
=
(
pTransInst
->
retry
!=
NULL
&&
pTransInst
->
retry
(
code
,
pResp
->
msgType
-
1
))
?
true
:
false
;
if
(
retry
)
{
if
(
retry
)
{
pMsg
->
sent
=
0
;
pMsg
->
sent
=
0
;
pCtx
->
retryCnt
+=
1
;
pCtx
->
retryCnt
+=
1
;
...
@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
...
@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if
(
pCtx
->
retryCnt
<
pCtx
->
retryLimit
)
{
if
(
pCtx
->
retryCnt
<
pCtx
->
retryLimit
)
{
transUnrefCliHandle
(
pConn
);
transUnrefCliHandle
(
pConn
);
EPSET_FORWARD_INUSE
(
&
pCtx
->
epSet
);
EPSET_FORWARD_INUSE
(
&
pCtx
->
epSet
);
transFreeMsg
(
pResp
->
pCont
);
cliSchedMsgToNextNode
(
pMsg
,
pThrd
);
cliSchedMsgToNextNode
(
pMsg
,
pThrd
);
return
-
1
;
return
-
1
;
}
}
...
@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
...
@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId
*
trace
=
&
pResp
->
info
.
traceId
;
STraceId
*
trace
=
&
pResp
->
info
.
traceId
;
bool
hasEpSet
=
cliTry
To
ExtractEpSet
(
pResp
,
&
pCtx
->
epSet
);
bool
hasEpSet
=
cliTryExtractEpSet
(
pResp
,
&
pCtx
->
epSet
);
if
(
hasEpSet
)
{
if
(
hasEpSet
)
{
char
tbuf
[
256
]
=
{
0
};
char
tbuf
[
256
]
=
{
0
};
EPSET_DEBUG_STR
(
&
pCtx
->
epSet
,
tbuf
);
EPSET_DEBUG_STR
(
&
pCtx
->
epSet
,
tbuf
);
...
@@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
...
@@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
tGDebug
(
"%s send request at thread:%08"
PRId64
", dst:%s:%d, app:%p"
,
transLabel
(
pTransInst
),
pThrd
->
pid
,
tGDebug
(
"%s send request at thread:%08"
PRId64
", dst:%s:%d, app:%p"
,
transLabel
(
pTransInst
),
pThrd
->
pid
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
),
pReq
->
info
.
ahandle
);
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
),
pReq
->
info
.
ahandle
);
if
(
0
!=
transAsyncSend
(
pThrd
->
asyncPool
,
&
cliMsg
->
q
))
{
int
ret
=
transAsyncSend
(
pThrd
->
asyncPool
,
&
cliMsg
->
q
);
tsem_destroy
(
sem
);
if
(
ret
!=
0
)
{
taosMemoryFree
(
sem
);
destroyCmsg
(
cliMsg
);
destroyCmsg
(
cliMsg
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
goto
_RETURN
;
return
-
1
;
}
}
tsem_wait
(
sem
);
tsem_wait
(
sem
);
_RETURN:
tsem_destroy
(
sem
);
tsem_destroy
(
sem
);
taosMemoryFree
(
sem
);
taosMemoryFree
(
sem
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
return
0
;
return
ret
;
}
}
/*
/*
*
*
...
...
source/libs/transport/src/transComm.c
浏览文件 @
eb5dbffc
...
@@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) {
...
@@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) {
return
ret
;
return
ret
;
}
}
SAsyncPool
*
trans
CreateAsyncPool
(
uv_loop_t
*
loop
,
int
sz
,
void
*
arg
,
AsyncCB
cb
)
{
SAsyncPool
*
trans
AsyncPoolCreate
(
uv_loop_t
*
loop
,
int
sz
,
void
*
arg
,
AsyncCB
cb
)
{
SAsyncPool
*
pool
=
taosMemoryCalloc
(
1
,
sizeof
(
SAsyncPool
));
SAsyncPool
*
pool
=
taosMemoryCalloc
(
1
,
sizeof
(
SAsyncPool
));
pool
->
nAsync
=
sz
;
pool
->
nAsync
=
sz
;
pool
->
asyncs
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_async_t
)
*
pool
->
nAsync
);
pool
->
asyncs
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_async_t
)
*
pool
->
nAsync
);
...
@@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
...
@@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
return
pool
;
return
pool
;
}
}
void
trans
DestroyAsyncPool
(
SAsyncPool
*
pool
)
{
void
trans
AsyncPoolDestroy
(
SAsyncPool
*
pool
)
{
for
(
int
i
=
0
;
i
<
pool
->
nAsync
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pool
->
nAsync
;
i
++
)
{
uv_async_t
*
async
=
&
(
pool
->
asyncs
[
i
]);
uv_async_t
*
async
=
&
(
pool
->
asyncs
[
i
]);
// uv_close((uv_handle_t*)async, NULL);
// uv_close((uv_handle_t*)async, NULL);
...
@@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
...
@@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree
(
pool
->
asyncs
);
taosMemoryFree
(
pool
->
asyncs
);
taosMemoryFree
(
pool
);
taosMemoryFree
(
pool
);
}
}
bool
transAsyncPoolIsEmpty
(
SAsyncPool
*
pool
)
{
for
(
int
i
=
0
;
i
<
pool
->
nAsync
;
i
++
)
{
uv_async_t
*
async
=
&
(
pool
->
asyncs
[
i
]);
SAsyncItem
*
item
=
async
->
data
;
if
(
!
QUEUE_IS_EMPTY
(
&
item
->
qmsg
))
return
false
;
}
return
true
;
}
int
transAsyncSend
(
SAsyncPool
*
pool
,
queue
*
q
)
{
int
transAsyncSend
(
SAsyncPool
*
pool
,
queue
*
q
)
{
if
(
atomic_load_8
(
&
pool
->
stop
)
==
1
)
{
if
(
atomic_load_8
(
&
pool
->
stop
)
==
1
)
{
return
-
1
;
return
-
1
;
...
@@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
...
@@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
}
}
return
uv_async_send
(
async
);
return
uv_async_send
(
async
);
}
}
bool
transAsyncPoolIsEmpty
(
SAsyncPool
*
pool
)
{
for
(
int
i
=
0
;
i
<
pool
->
nAsync
;
i
++
)
{
uv_async_t
*
async
=
&
(
pool
->
asyncs
[
i
]);
SAsyncItem
*
item
=
async
->
data
;
if
(
!
QUEUE_IS_EMPTY
(
&
item
->
qmsg
))
return
false
;
}
return
true
;
}
void
transCtxInit
(
STransCtx
*
ctx
)
{
void
transCtxInit
(
STransCtx
*
ctx
)
{
// init transCtx
// init transCtx
...
@@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) {
...
@@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) {
// init req queue
// init req queue
QUEUE_INIT
(
q
);
QUEUE_INIT
(
q
);
}
}
void
*
transReqQueuePush
Req
(
queue
*
q
)
{
void
*
transReqQueuePush
(
queue
*
q
)
{
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
));
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
));
STransReq
*
wreq
=
taosMemoryCalloc
(
1
,
sizeof
(
STransReq
));
STransReq
*
wreq
=
taosMemoryCalloc
(
1
,
sizeof
(
STransReq
));
wreq
->
data
=
req
;
wreq
->
data
=
req
;
...
@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
...
@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
heapDestroy
(
queue
->
heap
);
heapDestroy
(
queue
->
heap
);
taosMemoryFree
(
queue
);
taosMemoryFree
(
queue
);
}
}
void
transDQCancel
(
SDelayQueue
*
queue
,
SDelayTask
*
task
)
{
uv_timer_stop
(
queue
->
timer
);
if
(
heapSize
(
queue
->
heap
)
<=
0
)
return
;
heapRemove
(
queue
->
heap
,
&
task
->
node
);
int
transDQSched
(
SDelayQueue
*
queue
,
void
(
*
func
)(
void
*
arg
),
void
*
arg
,
uint64_t
timeoutMs
)
{
if
(
heapSize
(
queue
->
heap
)
!=
0
)
{
HeapNode
*
minNode
=
heapMin
(
queue
->
heap
);
if
(
minNode
!=
NULL
)
return
;
uint64_t
now
=
taosGetTimestampMs
();
SDelayTask
*
task
=
container_of
(
minNode
,
SDelayTask
,
node
);
uint64_t
timeout
=
now
>
task
->
execTime
?
now
-
task
->
execTime
:
0
;
uv_timer_start
(
queue
->
timer
,
transDQTimeout
,
timeout
,
0
);
}
}
SDelayTask
*
transDQSched
(
SDelayQueue
*
queue
,
void
(
*
func
)(
void
*
arg
),
void
*
arg
,
uint64_t
timeoutMs
)
{
uint64_t
now
=
taosGetTimestampMs
();
uint64_t
now
=
taosGetTimestampMs
();
SDelayTask
*
task
=
taosMemoryCalloc
(
1
,
sizeof
(
SDelayTask
));
SDelayTask
*
task
=
taosMemoryCalloc
(
1
,
sizeof
(
SDelayTask
));
task
->
func
=
func
;
task
->
func
=
func
;
...
@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
...
@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
tTrace
(
"timer %p put task into delay queue, timeoutMs:%"
PRIu64
,
queue
->
timer
,
timeoutMs
);
tTrace
(
"timer %p put task into delay queue, timeoutMs:%"
PRIu64
,
queue
->
timer
,
timeoutMs
);
heapInsert
(
queue
->
heap
,
&
task
->
node
);
heapInsert
(
queue
->
heap
,
&
task
->
node
);
uv_timer_start
(
queue
->
timer
,
transDQTimeout
,
timeoutMs
,
0
);
uv_timer_start
(
queue
->
timer
,
transDQTimeout
,
timeoutMs
,
0
);
return
0
;
return
task
;
}
}
void
transPrintEpSet
(
SEpSet
*
pEpSet
)
{
void
transPrintEpSet
(
SEpSet
*
pEpSet
)
{
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
eb5dbffc
...
@@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
...
@@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
uvPrepareSendData
(
smsg
,
&
wb
);
uvPrepareSendData
(
smsg
,
&
wb
);
transRefSrvHandle
(
pConn
);
transRefSrvHandle
(
pConn
);
uv_write_t
*
req
=
transReqQueuePush
Req
(
&
pConn
->
wreqQueue
);
uv_write_t
*
req
=
transReqQueuePush
(
&
pConn
->
wreqQueue
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
}
}
static
void
uvStartSendResp
(
SSvrMsg
*
smsg
)
{
static
void
uvStartSendResp
(
SSvrMsg
*
smsg
)
{
...
@@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
...
@@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
// conn set
// conn set
QUEUE_INIT
(
&
pThrd
->
conn
);
QUEUE_INIT
(
&
pThrd
->
conn
);
pThrd
->
asyncPool
=
trans
CreateAsyncPool
(
pThrd
->
loop
,
1
,
pThrd
,
uvWorkerAsyncCb
);
pThrd
->
asyncPool
=
trans
AsyncPoolCreate
(
pThrd
->
loop
,
1
,
pThrd
,
uvWorkerAsyncCb
);
uv_pipe_connect
(
&
pThrd
->
connect_req
,
pThrd
->
pipe
,
pipeName
,
uvOnPipeConnectionCb
);
uv_pipe_connect
(
&
pThrd
->
connect_req
,
pThrd
->
pipe
,
pipeName
,
uvOnPipeConnectionCb
);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return
true
;
return
true
;
...
@@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
...
@@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
taosThreadJoin
(
pThrd
->
thread
,
NULL
);
taosThreadJoin
(
pThrd
->
thread
,
NULL
);
SRV_RELEASE_UV
(
pThrd
->
loop
);
SRV_RELEASE_UV
(
pThrd
->
loop
);
TRANS_DESTROY_ASYNC_POOL_MSG
(
pThrd
->
asyncPool
,
SSvrMsg
,
destroySmsg
);
TRANS_DESTROY_ASYNC_POOL_MSG
(
pThrd
->
asyncPool
,
SSvrMsg
,
destroySmsg
);
trans
DestroyAsyncPool
(
pThrd
->
asyncPool
);
trans
AsyncPoolDestroy
(
pThrd
->
asyncPool
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
);
taosMemoryFree
(
pThrd
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录