Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7edf647a
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
7edf647a
编写于
1月 24, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
1月 24, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9999 from taosdata/feature/trans
handle server/client except
上级
26197a8c
b34a5bdf
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
85 addition
and
58 deletion
+85
-58
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+3
-3
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+56
-36
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+25
-18
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+1
-1
未找到文件。
source/libs/transport/inc/transComm.h
浏览文件 @
7edf647a
...
...
@@ -123,7 +123,7 @@ typedef struct {
}
SRpcReqContext
;
typedef
struct
{
SRpcInfo
*
p
Rpc
;
// associated SRpcInfo
SRpcInfo
*
p
TransInst
;
// associated SRpcInfo
SEpSet
epSet
;
// ip list provided by app
void
*
ahandle
;
// handle provided by app
// struct SRpcConn* pConn; // pConn allocated
...
...
source/libs/transport/src/transCli.c
浏览文件 @
7edf647a
...
...
@@ -30,6 +30,7 @@ typedef struct SCliConn {
char
spi
;
char
secured
;
uint64_t
expireTime
;
int8_t
notifyCount
;
// timers already notify to client
}
SCliConn
;
typedef
struct
SCliMsg
{
...
...
@@ -72,8 +73,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co
// register timer in each thread to clear expire conn
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
);
// process data read from server, auth/decompress etc later
static
void
clientHandleResp
(
SCliConn
*
conn
);
// check whether already read complete packet from server
static
bool
clientReadComplete
(
SConnBuffer
*
pBuf
);
// alloc buf for read
...
...
@@ -88,10 +87,15 @@ static void clientAsyncCb(uv_async_t* handle);
static
void
clientDestroy
(
uv_handle_t
*
handle
);
static
void
clientConnDestroy
(
SCliConn
*
pConn
,
bool
clear
/*clear tcp handle or not*/
);
static
void
clientMsgDestroy
(
SCliMsg
*
pMsg
);
// process data read from server, auth/decompress etc later
static
void
clientHandleResp
(
SCliConn
*
conn
);
// handle except about conn
static
void
clientHandleExcept
(
SCliConn
*
conn
);
// handle req from app
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientMsgDestroy
(
SCliMsg
*
pMsg
);
static
void
destroyTransConnCtx
(
STransConnCtx
*
ctx
);
// thread obj
static
SCliThrdObj
*
createThrdObj
();
static
void
destroyThrdObj
(
SCliThrdObj
*
pThrd
);
...
...
@@ -100,22 +104,38 @@ static void* clientThread(void* arg);
static
void
clientHandleResp
(
SCliConn
*
conn
)
{
STransConnCtx
*
pCtx
=
((
SCliMsg
*
)
conn
->
data
)
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
p
Rpc
;
SRpcInfo
*
pRpc
=
pCtx
->
p
TransInst
;
SRpcMsg
rpcMsg
;
rpcMsg
.
pCont
=
conn
->
readBuf
.
buf
;
rpcMsg
.
contLen
=
conn
->
readBuf
.
len
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
conn
->
notifyCount
+=
1
;
SCliThrdObj
*
pThrd
=
conn
->
hostThrd
;
addConnToPool
(
pThrd
->
pool
,
pCtx
->
ip
,
pCtx
->
port
,
conn
);
// start thread's timer of conn pool if not active
if
(
!
uv_is_active
((
uv_handle_t
*
)
pThrd
->
pTimer
)
&&
pRpc
->
idleTime
>
0
)
{
uv_timer_start
((
uv_timer_t
*
)
pThrd
->
pTimer
,
clientTimeoutCb
,
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
/
2
,
0
);
}
free
(
pCtx
->
ip
);
free
(
pCtx
);
// impl
destroyTransConnCtx
(
pCtx
);
}
static
void
clientHandleExcept
(
SCliConn
*
pConn
)
{
SCliMsg
*
pMsg
=
pConn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
pTransInst
;
SRpcMsg
rpcMsg
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
pConn
->
notifyCount
+=
1
;
destroyTransConnCtx
(
pCtx
);
clientConnDestroy
(
pConn
,
true
);
}
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
)
{
...
...
@@ -191,6 +211,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
SRpcInfo
*
pRpc
=
((
SCliThrdObj
*
)
conn
->
hostThrd
)
->
pTransInst
;
conn
->
expireTime
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pRpc
->
idleTime
);
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
conn
->
notifyCount
=
0
;
// list already create before
assert
(
plist
!=
NULL
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
conn
);
...
...
@@ -246,19 +267,21 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
if
(
clientReadComplete
(
pBuf
))
{
tDebug
(
"
alread read complete"
);
tDebug
(
"
conn %p read complete"
,
conn
);
clientHandleResp
(
conn
);
}
else
{
tDebug
(
"
read half packet, continue to read"
);
tDebug
(
"
conn %p read partial packet, continue to read"
,
conn
);
}
return
;
}
assert
(
nread
<=
0
);
if
(
nread
==
0
)
{
tError
(
"conn %p closed"
,
conn
);
return
;
}
if
(
nread
!=
UV_EOF
)
{
tDebug
(
"read error %s"
,
uv_err_name
(
nread
));
if
(
nread
<
0
)
{
tError
(
"conn %p read error: %s"
,
conn
,
uv_err_name
(
nread
));
clientHandleExcept
(
conn
);
}
// tDebug("Read error %s\n", uv_err_name(nread));
// uv_close((uv_handle_t*)handle, clientDestroy);
...
...
@@ -282,19 +305,20 @@ static void clientDestroy(uv_handle_t* handle) {
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
->
data
;
if
(
status
==
0
)
{
tDebug
(
"
data already was written on stream"
);
tDebug
(
"
conn %p data already was written out"
,
pConn
);
}
else
{
tError
(
"
failed to write: %s"
,
uv_err_name
(
status
));
client
ConnDestroy
(
pConn
,
true
);
tError
(
"
conn %p failed to write: %s"
,
pConn
,
uv_err_name
(
status
));
client
HandleExcept
(
pConn
);
return
;
}
SCliThrdObj
*
pThrd
=
pConn
->
hostThrd
;
if
(
pConn
->
stream
==
NULL
)
{
pConn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)
pConn
->
stream
);
pConn
->
stream
->
data
=
pConn
;
}
//
if (pConn->stream == NULL) {
//
pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
//
uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
//
pConn->stream->data = pConn;
//
}
uv_read_start
((
uv_stream_t
*
)
pConn
->
stream
,
clientAllocReadBufferCb
,
clientReadCb
);
// impl later
}
...
...
@@ -310,30 +334,19 @@ static void clientWrite(SCliConn* pConn) {
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
tDebug
(
"
data write out, msgType : %d, len: %d"
,
pHead
->
msgType
,
msgLen
);
tDebug
(
"
conn %p data write out, msgType : %d, len: %d"
,
pConn
,
pHead
->
msgType
,
msgLen
);
uv_write
(
pConn
->
writeReq
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
clientWriteCb
);
}
static
void
clientConnCb
(
uv_connect_t
*
req
,
int
status
)
{
// impl later
SCliConn
*
pConn
=
req
->
data
;
SCliMsg
*
pMsg
=
pConn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
pRpc
;
if
(
status
!=
0
)
{
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
tError
(
"failed to connect server, errmsg: %s"
,
uv_strerror
(
status
));
// call user fp later
SRpcMsg
rpcMsg
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
clientConnDestroy
(
pConn
,
true
);
// uv_close((uv_handle_t*)req->handle, clientDestroy);
tError
(
"conn %p failed to connect server: %s"
,
pConn
,
uv_strerror
(
status
));
clientHandleExcept
(
pConn
);
return
;
}
tDebug
(
"conn %p create"
,
pConn
);
assert
(
pConn
->
stream
==
req
->
handle
);
clientWrite
(
pConn
);
...
...
@@ -349,6 +362,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn
*
conn
=
getConnFromPool
(
pThrd
->
pool
,
pCtx
->
ip
,
pCtx
->
port
);
if
(
conn
!=
NULL
)
{
// impl later
tDebug
(
"conn %p get from conn pool"
,
conn
);
conn
->
data
=
pMsg
;
conn
->
writeReq
->
data
=
conn
;
...
...
@@ -462,6 +476,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
free
(
pThrd
->
loop
);
free
(
pThrd
);
}
static
void
destroyTransConnCtx
(
STransConnCtx
*
ctx
)
{
if
(
ctx
!=
NULL
)
{
free
(
ctx
->
ip
);
}
free
(
ctx
);
}
//
void
taosCloseClient
(
void
*
arg
)
{
// impl later
...
...
@@ -472,7 +493,6 @@ void taosCloseClient(void* arg) {
free
(
cli
->
pThreadObj
);
free
(
cli
);
}
void
rpcSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
pRid
)
{
// impl later
char
*
ip
=
(
char
*
)(
pEpSet
->
fqdn
[
pEpSet
->
inUse
]);
...
...
@@ -487,7 +507,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
STransConnCtx
*
pCtx
=
calloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
p
Rpc
=
(
SRpcInfo
*
)
shandle
;
pCtx
->
p
TransInst
=
(
SRpcInfo
*
)
shandle
;
pCtx
->
ahandle
=
pMsg
->
ahandle
;
pCtx
->
msgType
=
pMsg
->
msgType
;
pCtx
->
ip
=
strdup
(
ip
);
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
7edf647a
...
...
@@ -16,6 +16,7 @@
#ifdef USE_UV
#include "transComm.h"
typedef
struct
SConn
{
uv_tcp_t
*
pTcp
;
uv_write_t
*
pWriter
;
...
...
@@ -26,7 +27,6 @@ typedef struct SConn {
int
ref
;
int
persist
;
// persist connection or not
SConnBuffer
connBuf
;
// read buf,
int
count
;
int
inType
;
void
*
pTransInst
;
// rpc init
void
*
ahandle
;
//
...
...
@@ -226,7 +226,7 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug
(
"%p timeout since no activity"
,
conn
);
}
static
void
uv
ProcessData
(
SConn
*
pConn
)
{
static
void
uv
HandleReq
(
SConn
*
pConn
)
{
SRecvInfo
info
;
SRecvInfo
*
p
=
&
info
;
SConnBuffer
*
pBuf
=
&
pConn
->
connBuf
;
...
...
@@ -271,6 +271,7 @@ static void uvProcessData(SConn* pConn) {
rpcMsg
.
ahandle
=
NULL
;
rpcMsg
.
handle
=
pConn
;
pConn
->
ref
++
;
(
*
(
pRpc
->
cfp
))(
pRpc
->
parent
,
&
rpcMsg
,
NULL
);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
...
...
@@ -283,20 +284,23 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer
*
pBuf
=
&
conn
->
connBuf
;
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
tDebug
(
"
on read %p, total read: %d, current read: %d"
,
cli
,
pBuf
->
len
,
(
int
)
nread
);
tDebug
(
"
conn %p read summroy, total read: %d, current read: %d"
,
conn
,
pBuf
->
len
,
(
int
)
nread
);
if
(
readComplete
(
pBuf
))
{
tDebug
(
"
alread read complete packet"
);
uv
ProcessData
(
conn
);
tDebug
(
"
conn %p alread read complete packet"
,
conn
);
uv
HandleReq
(
conn
);
}
else
{
tDebug
(
"
read half packet, continue to read"
);
tDebug
(
"
conn %p read partial packet, continue to read"
,
conn
);
}
return
;
}
if
(
nread
==
0
)
{
tDebug
(
"conn %p except read"
,
conn
);
// destroyConn(conn, true);
return
;
}
if
(
nread
!=
UV_EOF
)
{
tDebug
(
"read error %s"
,
uv_err_name
(
nread
));
tDebug
(
"conn %p read error: %s"
,
conn
,
uv_err_name
(
nread
));
destroyConn
(
conn
,
true
);
}
}
void
uvAllocConnBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
...
...
@@ -306,7 +310,8 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void
uvOnTimeoutCb
(
uv_timer_t
*
handle
)
{
// opt
tDebug
(
"time out"
);
SConn
*
pConn
=
handle
->
data
;
tDebug
(
"conn %p time out"
,
pConn
);
}
void
uvOnWriteCb
(
uv_write_t
*
req
,
int
status
)
{
...
...
@@ -317,9 +322,9 @@ void uvOnWriteCb(uv_write_t* req, int status) {
memset
(
buf
->
buf
,
0
,
buf
->
cap
);
buf
->
left
=
-
1
;
if
(
status
==
0
)
{
tDebug
(
"
data already was written on stream"
);
tDebug
(
"
conn %p data already was written on stream"
,
conn
);
}
else
{
tDebug
(
"
failed to write data, %s"
,
uv_err_name
(
status
));
tDebug
(
"
conn %p failed to write data, %s"
,
conn
,
uv_err_name
(
status
));
destroyConn
(
conn
,
true
);
}
// opt
...
...
@@ -334,7 +339,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
static
void
uvPrepareSendData
(
SConn
*
conn
,
uv_buf_t
*
wb
)
{
// impl later;
tDebug
(
"
prepare to send back"
);
tDebug
(
"
conn %p prepare to send resp"
,
conn
);
SRpcMsg
*
pMsg
=
&
conn
->
sendMsg
;
if
(
pMsg
->
pCont
==
0
)
{
pMsg
->
pCont
=
(
void
*
)
rpcMallocCont
(
0
);
...
...
@@ -427,6 +432,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert
(
pending
==
UV_TCP
);
SConn
*
pConn
=
createConn
();
pConn
->
pTransInst
=
pThrd
->
pTransInst
;
/* init conn timer*/
pConn
->
pTimer
=
malloc
(
sizeof
(
uv_timer_t
));
...
...
@@ -448,7 +454,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if
(
uv_accept
(
q
,
(
uv_stream_t
*
)(
pConn
->
pTcp
))
==
0
)
{
uv_os_fd_t
fd
;
uv_fileno
((
const
uv_handle_t
*
)
pConn
->
pTcp
,
&
fd
);
tDebug
(
"
new connection created: %d"
,
fd
);
tDebug
(
"
conn %p created, fd: %d"
,
pConn
,
fd
);
uv_read_start
((
uv_stream_t
*
)(
pConn
->
pTcp
),
uvAllocReadBufferCb
,
uvOnReadCb
);
}
else
{
tDebug
(
"failed to create new connection"
);
...
...
@@ -515,19 +521,19 @@ void* workerThread(void* arg) {
static
SConn
*
createConn
()
{
SConn
*
pConn
=
(
SConn
*
)
calloc
(
1
,
sizeof
(
SConn
));
++
pConn
->
ref
;
return
pConn
;
}
static
void
connCloseCb
(
uv_handle_t
*
handle
)
{
// impl later
//
}
static
void
destroyConn
(
SConn
*
conn
,
bool
clear
)
{
if
(
conn
==
NULL
)
{
return
;
}
if
(
--
conn
->
ref
==
0
)
{
return
;
}
if
(
clear
)
{
uv_handle_t
handle
=
*
((
uv_handle_t
*
)
conn
->
pTcp
);
uv_close
(
&
handle
,
NULL
);
uv_close
((
uv_handle_t
*
)
conn
->
pTcp
,
NULL
);
}
uv_timer_stop
(
conn
->
pTimer
);
free
(
conn
->
pTimer
);
...
...
@@ -646,6 +652,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
pthread_mutex_lock
(
&
pThrd
->
connMtx
);
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
pthread_mutex_unlock
(
&
pThrd
->
connMtx
);
tDebug
(
"conn %p start to send resp"
,
pConn
);
uv_async_send
(
pConn
->
pWorkerAsync
);
}
...
...
source/libs/transport/test/rclient.c
浏览文件 @
7edf647a
...
...
@@ -63,7 +63,7 @@ static void *sendRequest(void *param) {
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
// tsem_wait(&pInfo->rspSem);
tsem_wait
(
&
pInfo
->
rspSem
);
tDebug
(
"recv response"
);
tDebug
(
"recv response
succefully
"
);
// usleep(100000000);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录