Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ed788d39
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ed788d39
编写于
3月 14, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
formate
上级
0deef5aa
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
47 addition
and
45 deletion
+47
-45
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+8
-12
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+34
-28
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+5
-5
未找到文件。
source/libs/transport/inc/transComm.h
浏览文件 @
ed788d39
...
...
@@ -120,8 +120,8 @@ typedef struct {
// SEpSet* pSet; // for synchronous API
}
SRpcReqContext
;
typedef
SRpcMsg
STransMsg
;
typedef
SRpcInfo
STrans
;
typedef
SRpcMsg
STransMsg
;
typedef
SRpcInfo
STrans
;
typedef
SRpcConnInfo
STransHandleInfo
;
typedef
struct
{
...
...
@@ -139,7 +139,7 @@ typedef struct {
int64_t
rid
;
// refId returned by taosAddRef
STransMsg
*
pRsp
;
// for synchronous API
tsem_t
*
pSem
;
// for synchronous API
tsem_t
*
pSem
;
// for synchronous API
int
hThrdIdx
;
char
*
ip
;
...
...
@@ -147,7 +147,6 @@ typedef struct {
// SEpSet* pSet; // for synchronous API
}
STransConnCtx
;
#pragma pack(push, 1)
typedef
struct
{
...
...
@@ -248,24 +247,21 @@ bool transReadComplete(SConnBuffer* connBuf);
int
transSetConnOption
(
uv_tcp_t
*
stream
);
void
transRefSrvHandle
(
void
*
handle
);
void
transUnrefSrvHandle
(
void
*
handle
);
void
transRefCliHandle
(
void
*
handle
);
void
transUnrefCliHandle
(
void
*
handle
);
void
transSendRequest
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
);
void
transSendRecv
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
,
STransMsg
*
pRsp
);
void
transSendRequest
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
);
void
transSendRecv
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
,
STransMsg
*
pRsp
);
void
transSendResponse
(
const
STransMsg
*
pMsg
);
int
transGetConnInfo
(
void
*
thandle
,
STransHandleInfo
*
pInfo
);
int
transGetConnInfo
(
void
*
thandle
,
STransHandleInfo
*
pInfo
);
void
*
transInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
*
transInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
transCloseClient
(
void
*
arg
);
void
transCloseServer
(
void
*
arg
);
void
transCloseClient
(
void
*
arg
);
void
transCloseServer
(
void
*
arg
);
#endif
source/libs/transport/src/transCli.c
浏览文件 @
ed788d39
...
...
@@ -42,7 +42,7 @@ typedef struct SCliConn {
typedef
struct
SCliMsg
{
STransConnCtx
*
ctx
;
STransMsg
msg
;
STransMsg
msg
;
queue
q
;
uint64_t
st
;
}
SCliMsg
;
...
...
@@ -122,7 +122,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \
if (thrd->quit) { \
cliHandleExcept(conn); \
cliHandleExcept(conn);
\
goto _RETURE; \
} \
} while (0)
...
...
@@ -130,19 +130,25 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HANDLE_BROKEN(conn) \
do { \
if (conn->broken) { \
cliHandleExcept(conn); \
cliHandleExcept(conn);
\
goto _RETURE; \
} \
} while (0);
#define CONN_SET_PERSIST_BY_APP(conn) do { if (conn->persist == false) { conn->persist = true; transRefCliHandle(conn);}} while(0)
#define CONN_SET_PERSIST_BY_APP(conn) \
do { \
if (conn->persist == false) { \
conn->persist = true; \
transRefCliHandle(conn); \
} \
} while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
static
void
*
cliWorkThread
(
void
*
arg
);
void
cliHandleResp
(
SCliConn
*
conn
)
{
void
cliHandleResp
(
SCliConn
*
conn
)
{
SCliThrdObj
*
pThrd
=
conn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)(
conn
->
readBuf
.
buf
);
pHead
->
code
=
htonl
(
pHead
->
code
);
...
...
@@ -156,16 +162,16 @@ void cliHandleResp(SCliConn* conn) {
rpcMsg
.
pCont
=
transContFromHead
((
char
*
)
pHead
);
rpcMsg
.
code
=
pHead
->
code
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
ahandle
=
NULL
;
rpcMsg
.
ahandle
=
NULL
;
SCliMsg
*
pMsg
=
conn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
?
pMsg
->
ctx
:
NULL
;
SCliMsg
*
pMsg
=
conn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
?
pMsg
->
ctx
:
NULL
;
if
(
pMsg
==
NULL
&&
!
CONN_NO_PERSIST_BY_APP
(
conn
))
{
rpcMsg
.
ahandle
=
pTransInst
->
mfp
?
(
*
pTransInst
->
mfp
)(
pTransInst
->
parent
,
rpcMsg
.
msgType
)
:
NULL
;
}
else
{
rpcMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
rpcMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
}
//if (rpcMsg.ahandle == NULL) {
//
if (rpcMsg.ahandle == NULL) {
// tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn);
// return;
//}
...
...
@@ -207,26 +213,26 @@ void cliHandleResp(SCliConn* conn) {
void
cliHandleExcept
(
SCliConn
*
pConn
)
{
if
(
pConn
->
data
==
NULL
)
{
if
(
pConn
->
broken
==
true
||
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
transUnrefCliHandle
(
pConn
);
return
;
}
if
(
pConn
->
broken
==
true
||
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
transUnrefCliHandle
(
pConn
);
return
;
}
}
SCliThrdObj
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliMsg
*
pMsg
=
pConn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
?
pMsg
->
ctx
:
NULL
;
STransConnCtx
*
pCtx
=
pMsg
?
pMsg
->
ctx
:
NULL
;
STransMsg
rpcMsg
=
{
0
};
rpcMsg
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
rpcMsg
.
msgType
=
pMsg
?
pMsg
->
msg
.
msgType
+
1
:
0
;
rpcMsg
.
ahandle
=
NULL
;
rpcMsg
.
ahandle
=
NULL
;
if
(
pMsg
==
NULL
&&
!
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
rpcMsg
.
ahandle
=
pTransInst
->
mfp
?
(
*
pTransInst
->
mfp
)(
pTransInst
->
parent
,
rpcMsg
.
msgType
)
:
NULL
;
}
else
{
rpcMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
rpcMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
}
if
(
pCtx
==
NULL
||
pCtx
->
pSem
==
NULL
)
{
...
...
@@ -246,7 +252,7 @@ void cliHandleExcept(SCliConn* pConn) {
void
cliTimeoutCb
(
uv_timer_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
STrans
*
pRpc
=
pThrd
->
pTransInst
;
STrans
*
pRpc
=
pThrd
->
pTransInst
;
int64_t
currentTime
=
pThrd
->
nextTimeout
;
tTrace
(
"%s, cli conn timeout, try to remove expire conn from conn pool"
,
pRpc
->
label
);
...
...
@@ -420,7 +426,7 @@ void cliSend(SCliConn* pConn) {
STransConnCtx
*
pCtx
=
pCliMsg
->
ctx
;
SCliThrdObj
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STransMsg
*
pMsg
=
(
STransMsg
*
)(
&
pCliMsg
->
msg
);
...
...
@@ -513,7 +519,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace
(
"%s cli msg tran time cost: %"
PRIu64
"us"
,
((
STrans
*
)
pThrd
->
pTransInst
)
->
label
,
el
);
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliConn
*
conn
=
cliGetConn
(
pMsg
,
pThrd
);
if
(
conn
!=
NULL
)
{
...
...
@@ -534,7 +540,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace
(
"%s cli conn %p try to connect to %s:%d"
,
pTransInst
->
label
,
conn
,
pMsg
->
ctx
->
ip
,
pMsg
->
ctx
->
port
);
uv_tcp_connect
(
&
conn
->
connReq
,
(
uv_tcp_t
*
)(
conn
->
stream
),
(
const
struct
sockaddr
*
)
&
addr
,
cliConnCb
);
}
conn
->
hThrdIdx
=
pCtx
->
hThrdIdx
;
}
static
void
cliAsyncCb
(
uv_async_t
*
handle
)
{
...
...
@@ -543,7 +549,7 @@ static void cliAsyncCb(uv_async_t* handle) {
SCliMsg
*
pMsg
=
NULL
;
// batch process to avoid to lock/unlock frequently
queue
wq
;
queue
wq
;
pthread_mutex_lock
(
&
item
->
mtx
);
QUEUE_MOVE
(
&
item
->
qmsg
,
&
wq
);
pthread_mutex_unlock
(
&
item
->
mtx
);
...
...
@@ -689,9 +695,9 @@ void transUnrefCliHandle(void* handle) {
}
}
void
transSendRequest
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
)
{
void
transSendRequest
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
int
index
=
CONN_HOST_THREAD_INDEX
((
SCliConn
*
)
pMsg
->
handle
);
int
index
=
CONN_HOST_THREAD_INDEX
((
SCliConn
*
)
pMsg
->
handle
);
if
(
index
==
-
1
)
{
index
=
cliRBChoseIdx
(
pTransInst
);
}
...
...
@@ -718,9 +724,9 @@ void transSendRequest(void *shandle, const char *ip, uint32_t port, STransMsg *p
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
index
];
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
));
}
void
transSendRecv
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pReq
,
STransMsg
*
pRsp
)
{
void
transSendRecv
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pReq
,
STransMsg
*
pRsp
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
int
index
=
CONN_HOST_THREAD_INDEX
(
pReq
->
handle
);
int
index
=
CONN_HOST_THREAD_INDEX
(
pReq
->
handle
);
if
(
index
==
-
1
)
{
index
=
cliRBChoseIdx
(
pTransInst
);
}
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
ed788d39
...
...
@@ -48,7 +48,7 @@ typedef struct SSrvConn {
typedef
struct
SSrvMsg
{
SSrvConn
*
pConn
;
STransMsg
msg
;
STransMsg
msg
;
queue
q
;
}
SSrvMsg
;
...
...
@@ -317,8 +317,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later;
tTrace
(
"server conn %p prepare to send resp"
,
smsg
->
pConn
);
SSrvConn
*
pConn
=
smsg
->
pConn
;
STransMsg
*
pMsg
=
&
smsg
->
msg
;
SSrvConn
*
pConn
=
smsg
->
pConn
;
STransMsg
*
pMsg
=
&
smsg
->
msg
;
if
(
pMsg
->
pCont
==
0
)
{
pMsg
->
pCont
=
(
void
*
)
rpcMallocCont
(
0
);
pMsg
->
contLen
=
0
;
...
...
@@ -798,8 +798,8 @@ void transSendResponse(const STransMsg* pMsg) {
tTrace
(
"server conn %p start to send resp"
,
pConn
);
transSendAsync
(
pThrd
->
asyncPool
,
&
srvMsg
->
q
);
}
int
transGetConnInfo
(
void
*
thandle
,
STransHandleInfo
*
pInfo
)
{
SSrvConn
*
pConn
=
thandle
;
int
transGetConnInfo
(
void
*
thandle
,
STransHandleInfo
*
pInfo
)
{
SSrvConn
*
pConn
=
thandle
;
struct
sockaddr_in
addr
=
pConn
->
addr
;
pInfo
->
clientIp
=
(
uint32_t
)(
addr
.
sin_addr
.
s_addr
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录