Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
48968822
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
48968822
编写于
1月 21, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
1月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9939 from taosdata/feature/rpc
refactor rpc
上级
80a07c38
616a02f5
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
121 addition
and
18 deletion
+121
-18
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+1
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+107
-11
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+11
-7
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+2
-0
未找到文件。
source/libs/transport/src/trans.c
浏览文件 @
48968822
...
@@ -70,6 +70,7 @@ int32_t rpcInit(void) {
...
@@ -70,6 +70,7 @@ int32_t rpcInit(void) {
void
rpcCleanup
(
void
)
{
void
rpcCleanup
(
void
)
{
// impl later
// impl later
//
return
;
return
;
}
}
#endif
#endif
source/libs/transport/src/transCli.c
浏览文件 @
48968822
...
@@ -26,6 +26,7 @@ typedef struct SCliConn {
...
@@ -26,6 +26,7 @@ typedef struct SCliConn {
queue
conn
;
queue
conn
;
char
spi
;
char
spi
;
char
secured
;
char
secured
;
uint64_t
expireTime
;
}
SCliConn
;
}
SCliConn
;
typedef
struct
SCliMsg
{
typedef
struct
SCliMsg
{
...
@@ -39,10 +40,13 @@ typedef struct SCliThrdObj {
...
@@ -39,10 +40,13 @@ typedef struct SCliThrdObj {
pthread_t
thread
;
pthread_t
thread
;
uv_loop_t
*
loop
;
uv_loop_t
*
loop
;
uv_async_t
*
cliAsync
;
//
uv_async_t
*
cliAsync
;
//
uv_timer_t
*
pTimer
;
void
*
cache
;
// conn pool
void
*
cache
;
// conn pool
queue
msg
;
queue
msg
;
pthread_mutex_t
msgMtx
;
pthread_mutex_t
msgMtx
;
void
*
shandle
;
uint64_t
nextTimeout
;
// next timeout
void
*
shandle
;
//
}
SCliThrdObj
;
}
SCliThrdObj
;
typedef
struct
SClientObj
{
typedef
struct
SClientObj
{
...
@@ -52,10 +56,19 @@ typedef struct SClientObj {
...
@@ -52,10 +56,19 @@ typedef struct SClientObj {
SCliThrdObj
**
pThreadObj
;
SCliThrdObj
**
pThreadObj
;
}
SClientObj
;
}
SClientObj
;
typedef
struct
SConnList
{
queue
conn
;
}
SConnList
;
// conn pool
// conn pool
// add expire timeout and capacity limit
static
void
*
connCacheCreate
(
int
size
);
static
void
*
connCacheDestroy
(
void
*
cache
);
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
);
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
);
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
);
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
);
// register timer in each thread to clear expire conn
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
);
// process data read from server, auth/decompress etc
// process data read from server, auth/decompress etc
static
void
clientProcessData
(
SCliConn
*
conn
);
static
void
clientProcessData
(
SCliConn
*
conn
);
// check whether already read complete packet from server
// check whether already read complete packet from server
...
@@ -77,10 +90,93 @@ static void clientMsgDestroy(SCliMsg* pMsg);
...
@@ -77,10 +90,93 @@ static void clientMsgDestroy(SCliMsg* pMsg);
static
void
*
clientThread
(
void
*
arg
);
static
void
*
clientThread
(
void
*
arg
);
static
void
clientProcessData
(
SCliConn
*
conn
)
{
static
void
clientProcessData
(
SCliConn
*
conn
)
{
STransConnCtx
*
pCtx
=
((
SCliMsg
*
)
conn
->
data
)
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
ahandle
;
SRpcMsg
rpcMsg
;
rpcMsg
.
pCont
=
conn
->
readBuf
.
buf
;
rpcMsg
.
contLen
=
conn
->
readBuf
.
len
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
// impl
// impl
}
}
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
SRpcInfo
*
pRpc
=
pThrd
->
shandle
;
int64_t
currentTime
=
pThrd
->
nextTimeout
;
SConnList
*
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
cache
,
NULL
);
while
(
p
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
p
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
p
->
conn
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
if
(
c
->
expireTime
<
currentTime
)
{
QUEUE_REMOVE
(
h
);
clientConnDestroy
(
c
);
}
else
{
break
;
}
}
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
cache
,
p
);
}
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
uv_timer_start
(
handle
,
clientTimeoutCb
,
pRpc
->
idleTime
*
10
,
0
);
}
static
void
*
connCacheCreate
(
int
size
)
{
SHashObj
*
cache
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
return
false
;
}
static
void
*
connCacheDestroy
(
void
*
cache
)
{
SConnList
*
connList
=
taosHashIterate
((
SHashObj
*
)
cache
,
NULL
);
while
(
connList
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
connList
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
connList
->
conn
);
QUEUE_REMOVE
(
h
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
clientConnDestroy
(
c
);
}
connList
=
taosHashIterate
((
SHashObj
*
)
cache
,
connList
);
}
taosHashClear
(
cache
);
}
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
)
{
char
key
[
128
]
=
{
0
};
tstrncpy
(
key
,
ip
,
strlen
(
ip
));
tstrncpy
(
key
+
strlen
(
key
),
(
char
*
)(
&
port
),
sizeof
(
port
));
SHashObj
*
pCache
=
cache
;
SConnList
*
plist
=
taosHashGet
(
pCache
,
key
,
strlen
(
key
));
if
(
plist
==
NULL
)
{
SConnList
list
;
plist
=
&
list
;
QUEUE_INIT
(
&
plist
->
conn
);
taosHashPut
(
pCache
,
key
,
strlen
(
key
),
plist
,
sizeof
(
*
plist
));
}
if
(
QUEUE_IS_EMPTY
(
&
plist
->
conn
))
{
return
NULL
;
}
queue
*
h
=
QUEUE_HEAD
(
&
plist
->
conn
);
QUEUE_REMOVE
(
h
);
return
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
}
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
)
{
char
key
[
128
]
=
{
0
};
tstrncpy
(
key
,
ip
,
strlen
(
ip
));
tstrncpy
(
key
+
strlen
(
key
),
(
char
*
)(
&
port
),
sizeof
(
port
));
STransConnCtx
*
ctx
=
((
SCliMsg
*
)
conn
->
data
)
->
ctx
;
SRpcInfo
*
pRpc
=
ctx
->
pRpc
;
conn
->
expireTime
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
cache
,
key
,
strlen
(
key
));
// list already create before
assert
(
plist
!=
NULL
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
conn
);
}
static
bool
clientReadComplete
(
SConnBuffer
*
data
)
{
static
bool
clientReadComplete
(
SConnBuffer
*
data
)
{
STransMsgHead
head
;
STransMsgHead
head
;
int32_t
headLen
=
sizeof
(
head
);
int32_t
headLen
=
sizeof
(
head
);
...
@@ -152,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) {
...
@@ -152,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) {
}
}
static
void
clientDestroy
(
uv_handle_t
*
handle
)
{
static
void
clientDestroy
(
uv_handle_t
*
handle
)
{
SCliConn
*
conn
=
handle
->
data
;
SCliConn
*
conn
=
handle
->
data
;
QUEUE_REMOVE
(
&
conn
->
conn
);
clientConnDestroy
(
conn
);
clientConnDestroy
(
conn
);
}
}
...
@@ -206,15 +303,6 @@ static void clientConnCb(uv_connect_t* req, int status) {
...
@@ -206,15 +303,6 @@ static void clientConnCb(uv_connect_t* req, int status) {
clientWrite
(
pConn
);
clientWrite
(
pConn
);
}
}
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
)
{
// impl later
return
NULL
;
}
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
)
{
// impl later
}
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
uint64_t
et
=
taosGetTimestampUs
();
uint64_t
et
=
taosGetTimestampUs
();
uint64_t
el
=
et
-
pMsg
->
st
;
uint64_t
el
=
et
-
pMsg
->
st
;
...
@@ -234,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
...
@@ -234,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
conn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
conn
->
writeReq
=
malloc
(
sizeof
(
uv_write_t
));
conn
->
writeReq
=
malloc
(
sizeof
(
uv_write_t
));
QUEUE_INIT
(
&
conn
->
conn
);
conn
->
connReq
.
data
=
conn
;
conn
->
connReq
.
data
=
conn
;
conn
->
data
=
pMsg
;
conn
->
data
=
pMsg
;
...
@@ -270,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) {
...
@@ -270,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) {
static
void
*
clientThread
(
void
*
arg
)
{
static
void
*
clientThread
(
void
*
arg
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
SRpcInfo
*
pRpc
=
pThrd
->
shandle
;
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
uv_timer_start
(
pThrd
->
pTimer
,
clientTimeoutCb
,
pRpc
->
idleTime
*
10
,
0
);
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
}
...
@@ -291,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
...
@@ -291,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
pTimer
=
malloc
(
sizeof
(
uv_timer_t
));
uv_timer_init
(
pThrd
->
loop
,
pThrd
->
pTimer
);
pThrd
->
shandle
=
shandle
;
pThrd
->
shandle
=
shandle
;
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
if
(
err
==
0
)
{
if
(
err
==
0
)
{
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
48968822
...
@@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
...
@@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
// refers specifically to query or insert timeout
// refers specifically to query or insert timeout
static
void
uvHandleActivityTimeout
(
uv_timer_t
*
handle
)
{
static
void
uvHandleActivityTimeout
(
uv_timer_t
*
handle
)
{
// impl later
SConn
*
conn
=
handle
->
data
;
SConn
*
conn
=
handle
->
data
;
tDebug
(
"%p timeout since no activity"
,
conn
);
}
}
static
void
uvProcessData
(
SConn
*
pConn
)
{
static
void
uvProcessData
(
SConn
*
pConn
)
{
...
@@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) {
...
@@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) {
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
p
->
shandle
;
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
p
->
shandle
;
// auth here
// auth here
// auth should not do in rpc thread
int8_t
code
=
uvAuthMsg
(
pConn
,
(
char
*
)
pHead
,
p
->
msgLen
);
//
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
if
(
code
!=
0
)
{
//
if (code != 0) {
terrno
=
code
;
//
terrno = code;
return
;
//
return;
}
//
}
pHead
->
code
=
htonl
(
pHead
->
code
);
pHead
->
code
=
htonl
(
pHead
->
code
);
int32_t
dlen
=
0
;
int32_t
dlen
=
0
;
...
@@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) {
...
@@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) {
}
else
{
}
else
{
// impl later
// impl later
}
}
rpcMsg
.
contLen
=
rpc
ContLenFromMsg
(
pHead
->
msgLen
);
rpcMsg
.
contLen
=
trans
ContLenFromMsg
(
pHead
->
msgLen
);
rpcMsg
.
pCont
=
pHead
->
content
;
rpcMsg
.
pCont
=
pHead
->
content
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
code
=
pHead
->
code
;
rpcMsg
.
code
=
pHead
->
code
;
...
@@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
...
@@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
return
;
return
;
}
}
uv_buf_t
wb
=
uv_buf_init
(
conn
->
writeBuf
.
buf
,
conn
->
writeBuf
.
len
);
uv_buf_t
wb
=
uv_buf_init
(
conn
->
writeBuf
.
buf
,
conn
->
writeBuf
.
len
);
uv_timer_stop
(
conn
->
pTimer
);
uv_write
(
conn
->
pWriter
,
(
uv_stream_t
*
)
conn
->
pTcp
,
&
wb
,
1
,
uvOnWriteCb
);
uv_write
(
conn
->
pWriter
,
(
uv_stream_t
*
)
conn
->
pTcp
,
&
wb
,
1
,
uvOnWriteCb
);
}
}
}
}
...
...
source/libs/transport/test/rclient.c
浏览文件 @
48968822
...
@@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
...
@@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
rpcFreeCont
(
pMsg
->
pCont
);
rpcFreeCont
(
pMsg
->
pCont
);
// tsem_post(&pInfo->rspSem);
tsem_post
(
&
pInfo
->
rspSem
);
tsem_post
(
&
pInfo
->
rspSem
);
}
}
...
@@ -60,6 +61,7 @@ static void *sendRequest(void *param) {
...
@@ -60,6 +61,7 @@ static void *sendRequest(void *param) {
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest
(
pInfo
->
pRpc
,
&
pInfo
->
epSet
,
&
rpcMsg
,
NULL
);
rpcSendRequest
(
pInfo
->
pRpc
,
&
pInfo
->
epSet
,
&
rpcMsg
,
NULL
);
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
// tsem_wait(&pInfo->rspSem);
tsem_wait
(
&
pInfo
->
rspSem
);
tsem_wait
(
&
pInfo
->
rspSem
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录