Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
570f9c3a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
570f9c3a
编写于
8月 18, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
merge main
上级
3712e8c6
46ec10e1
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
30 addition
and
14 deletion
+30
-14
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+6
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+14
-6
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+0
-1
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+10
-7
未找到文件。
source/libs/transport/inc/transComm.h
浏览文件 @
570f9c3a
...
...
@@ -104,6 +104,12 @@ typedef void* queue[2];
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
typedef
SRpcMsg
STransMsg
;
typedef
SRpcCtx
STransCtx
;
typedef
SRpcCtxVal
STransCtxVal
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
570f9c3a
...
...
@@ -318,10 +318,17 @@ void cliHandleResp(SCliConn* conn) {
}
STransMsgHead
*
pHead
=
NULL
;
transDumpFromBuffer
(
&
conn
->
readBuf
,
(
char
**
)
&
pHead
);
if
(
transDumpFromBuffer
(
&
conn
->
readBuf
,
(
char
**
)
&
pHead
)
<=
0
)
{
tDebug
(
"%s conn %p recv invalid packet "
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
return
;
}
pHead
->
code
=
htonl
(
pHead
->
code
);
pHead
->
msgLen
=
htonl
(
pHead
->
msgLen
);
if
(
cliRecvReleaseReq
(
conn
,
pHead
))
{
return
;
}
STransMsg
transMsg
=
{
0
};
transMsg
.
contLen
=
transContLenFromMsg
(
pHead
->
msgLen
);
transMsg
.
pCont
=
transContFromHead
((
char
*
)
pHead
);
...
...
@@ -333,10 +340,6 @@ void cliHandleResp(SCliConn* conn) {
SCliMsg
*
pMsg
=
NULL
;
STransConnCtx
*
pCtx
=
NULL
;
if
(
cliRecvReleaseReq
(
conn
,
pHead
))
{
return
;
}
if
(
CONN_NO_PERSIST_BY_APP
(
conn
))
{
pMsg
=
transQueuePop
(
&
conn
->
cliMsgs
);
...
...
@@ -598,7 +601,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
pBuf
->
len
+=
nread
;
while
(
transReadComplete
(
pBuf
))
{
tTrace
(
"%s conn %p read complete"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
cliHandleResp
(
conn
);
if
(
pBuf
->
invalid
)
{
cliHandleExcept
(
conn
);
break
;
}
else
{
cliHandleResp
(
conn
);
}
}
return
;
}
...
...
source/libs/transport/src/transComm.c
浏览文件 @
570f9c3a
...
...
@@ -188,7 +188,6 @@ bool transReadComplete(SConnBuffer* connBuf) {
p
->
left
=
0
;
}
}
return
(
p
->
left
==
0
||
p
->
invalid
)
?
true
:
false
;
}
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
570f9c3a
...
...
@@ -198,15 +198,16 @@ static bool uvHandleReq(SSvrConn* pConn) {
pHead
->
msgLen
=
htonl
(
pHead
->
msgLen
);
memcpy
(
pConn
->
user
,
pHead
->
user
,
strlen
(
pHead
->
user
));
if
(
uvRecvReleaseReq
(
pConn
,
pHead
))
{
return
true
;
}
// TODO(dengyihao): time-consuming task throwed into BG Thread
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
// wreq->data = pConn;
// uv_read_stop((uv_stream_t*)pConn->pTcp);
// transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
if
(
uvRecvReleaseReq
(
pConn
,
pHead
))
{
return
true
;
}
STransMsg
transMsg
;
memset
(
&
transMsg
,
0
,
sizeof
(
transMsg
));
...
...
@@ -274,14 +275,16 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if
(
pBuf
->
len
<=
TRANS_PACKET_LIMIT
)
{
while
(
transReadComplete
(
pBuf
))
{
tTrace
(
"%s conn %p alread read complete packet"
,
transLabel
(
pTransInst
),
conn
);
if
(
uvHandleReq
(
conn
)
==
false
)
{
if
(
pBuf
->
invalid
)
{
tTrace
(
"%s conn %p alread read invalid packet"
,
transLabel
(
pTransInst
),
conn
);
destroyConn
(
conn
,
true
);
return
;
}
else
{
if
(
false
==
uvHandleReq
(
conn
))
break
;
}
}
return
;
}
else
{
tError
(
"%s conn %p read unexpected packet, exceed limit"
,
transLabel
(
pTransInst
),
conn
);
destroyConn
(
conn
,
true
);
return
;
}
...
...
@@ -872,6 +875,7 @@ static int reallocConnRef(SSvrConn* conn) {
}
static
void
uvDestroyConn
(
uv_handle_t
*
handle
)
{
SSvrConn
*
conn
=
handle
->
data
;
if
(
conn
==
NULL
)
{
return
;
}
...
...
@@ -887,9 +891,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
SSvrMsg
*
msg
=
transQueueGet
(
&
conn
->
srvMsgs
,
i
);
destroySmsg
(
msg
);
}
transReqQueueClear
(
&
conn
->
wreqQueue
);
transQueueDestroy
(
&
conn
->
srvMsgs
);
transReqQueueClear
(
&
conn
->
wreqQueue
);
QUEUE_REMOVE
(
&
conn
->
queue
);
taosMemoryFree
(
conn
->
pTcp
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录