Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5941437b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
5941437b
编写于
2月 22, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor code
上级
5050ce67
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
26 addition
and
82 deletion
+26
-82
source/dnode/mgmt/impl/test/sut/src/client.cpp
source/dnode/mgmt/impl/test/sut/src/client.cpp
+4
-1
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+5
-4
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+1
-29
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+15
-7
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+1
-41
未找到文件。
source/dnode/mgmt/impl/test/sut/src/client.cpp
浏览文件 @
5941437b
...
...
@@ -24,6 +24,9 @@ static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) {
}
void
TestClient
::
SetRpcRsp
(
SRpcMsg
*
rsp
)
{
if
(
this
->
pRsp
)
{
free
(
this
->
pRsp
);
}
this
->
pRsp
=
(
SRpcMsg
*
)
calloc
(
1
,
sizeof
(
SRpcMsg
));
this
->
pRsp
->
msgType
=
rsp
->
msgType
;
this
->
pRsp
->
code
=
rsp
->
code
;
...
...
@@ -60,7 +63,7 @@ bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint
strcpy
(
this
->
user
,
user
);
strcpy
(
this
->
pass
,
pass
);
this
->
port
=
port
;
//
this->pRsp = NULL;
this
->
pRsp
=
NULL
;
this
->
DoInit
();
return
true
;
}
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
5941437b
...
...
@@ -238,10 +238,11 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
void
transDestroyAsyncPool
(
SAsyncPool
*
pool
);
int
transSendAsync
(
SAsyncPool
*
pool
,
queue
*
mq
);
int
transInitBuffer
(
SConnBuffer
*
buf
);
int
transClearBuffer
(
SConnBuffer
*
buf
);
int
transDestroyBuffer
(
SConnBuffer
*
buf
);
int
transAllocBuffer
(
SConnBuffer
*
connBuf
,
uv_buf_t
*
uvBuf
);
int
transInitBuffer
(
SConnBuffer
*
buf
);
int
transClearBuffer
(
SConnBuffer
*
buf
);
int
transDestroyBuffer
(
SConnBuffer
*
buf
);
int
transAllocBuffer
(
SConnBuffer
*
connBuf
,
uv_buf_t
*
uvBuf
);
bool
transReadComplete
(
SConnBuffer
*
connBuf
);
// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
5941437b
...
...
@@ -84,8 +84,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co
// register timer in each thread to clear expire conn
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
);
// check whether already read complete packet from server
static
bool
clientReadComplete
(
SConnBuffer
*
pBuf
);
// alloc buf for read
static
void
clientAllocBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
// callback after read nbytes from socket
...
...
@@ -309,32 +307,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
assert
(
plist
!=
NULL
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
conn
);
}
static
bool
clientReadComplete
(
SConnBuffer
*
data
)
{
if
(
data
->
len
>=
sizeof
(
STransMsgHead
))
{
STransMsgHead
head
;
memcpy
((
char
*
)
&
head
,
data
->
buf
,
sizeof
(
head
));
int32_t
msgLen
=
(
int32_t
)
htonl
(
head
.
msgLen
);
data
->
total
=
msgLen
;
}
if
(
data
->
len
==
data
->
cap
&&
data
->
total
==
data
->
cap
)
{
return
true
;
}
return
false
;
// if (data->len >= headLen) {
// memcpy((char*)&head, data->buf, headLen);
// int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
// if (msgLen > data->len) {
// data->left = msgLen - data->len;
// return false;
// } else if (msgLen == data->len) {
// data->left = 0;
// return true;
// }
//} else {
// return false;
//}
}
static
void
clientAllocBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
SCliConn
*
conn
=
handle
->
data
;
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
...
...
@@ -349,7 +321,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
if
(
client
ReadComplete
(
pBuf
))
{
if
(
trans
ReadComplete
(
pBuf
))
{
tTrace
(
"client conn %p read complete"
,
conn
);
clientHandleResp
(
conn
);
}
else
{
...
...
source/libs/transport/src/transComm.c
浏览文件 @
5941437b
...
...
@@ -222,23 +222,31 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
p
->
buf
=
(
char
*
)
calloc
(
CAPACITY
,
sizeof
(
char
));
p
->
len
=
0
;
p
->
cap
=
CAPACITY
;
p
->
total
=
0
;
p
->
total
=
-
1
;
uvBuf
->
base
=
p
->
buf
;
uvBuf
->
len
=
CAPACITY
;
}
else
{
STransMsgHead
head
;
memcpy
((
char
*
)
&
head
,
p
->
buf
,
sizeof
(
head
));
int32_t
msgLen
=
(
int32_t
)
htonl
(
head
.
msgLen
);
p
->
total
=
msgLen
;
p
->
cap
=
msgLen
;
p
->
cap
=
p
->
total
;
p
->
buf
=
realloc
(
p
->
buf
,
p
->
cap
);
uvBuf
->
base
=
p
->
buf
+
p
->
len
;
uvBuf
->
len
=
p
->
cap
-
p
->
len
;
}
return
0
;
}
// check whether already read complete
bool
transReadComplete
(
SConnBuffer
*
connBuf
)
{
if
(
connBuf
->
total
==
-
1
&&
connBuf
->
len
>=
sizeof
(
STransMsgHead
))
{
STransMsgHead
head
;
memcpy
((
char
*
)
&
head
,
connBuf
->
buf
,
sizeof
(
head
));
int32_t
msgLen
=
(
int32_t
)
htonl
(
head
.
msgLen
);
connBuf
->
total
=
msgLen
;
}
if
(
connBuf
->
len
==
connBuf
->
cap
&&
connBuf
->
total
==
connBuf
->
cap
)
{
return
true
;
}
return
false
;
}
int
transPackMsg
(
STransMsgHead
*
msgHead
,
bool
sercured
,
bool
auth
)
{}
int
transUnpackMsg
(
STransMsgHead
*
msgHead
)
{}
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
5941437b
...
...
@@ -104,7 +104,6 @@ static void uvStartSendResp(SSrvMsg* msg);
static
void
destroySmsg
(
SSrvMsg
*
smsg
);
// check whether already read complete packet
static
bool
readComplete
(
SConnBuffer
*
buf
);
static
SSrvConn
*
createConn
(
void
*
hThrd
);
static
void
destroyConn
(
SSrvConn
*
conn
,
bool
clear
/*clear handle or not*/
);
...
...
@@ -124,45 +123,6 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
transAllocBuffer
(
pBuf
,
buf
);
}
// check data read from socket complete or not
//
static
bool
readComplete
(
SConnBuffer
*
data
)
{
// TODO(yihao): handle pipeline later
if
(
data
->
len
==
data
->
cap
&&
data
->
total
==
data
->
cap
)
{
return
true
;
}
return
false
;
// STransMsgHead head;
// int32_t headLen = sizeof(head);
// if (data->len >= headLen) {
// memcpy((char*)&head, data->buf, headLen);
// int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
// if (msgLen > data->len) {
// data->left = msgLen - data->len;
// return false;
// } else if (msgLen == data->len) {
// return true;
// } else if (msgLen < data->len) {
// return false;
// // handle other packet later
// }
//} else {
// return false;
//}
}
// static void uvDoProcess(SRecvInfo* pRecv) {
// // impl later
// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
// SSrvConn* pConn = pRecv->thandle;
// tDump(pRecv->msg, pRecv->msgLen);
// terrno = 0;
// // SRpcReqContext* pContest;
//
// // do auth and check
//}
static
int
uvAuthMsg
(
SSrvConn
*
pConn
,
char
*
msg
,
int
len
)
{
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)
msg
;
...
...
@@ -283,7 +243,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
tTrace
(
"server conn %p read summary, total read: %d, current read: %d"
,
conn
,
pBuf
->
len
,
(
int
)
nread
);
if
(
r
eadComplete
(
pBuf
))
{
if
(
transR
eadComplete
(
pBuf
))
{
tTrace
(
"server conn %p alread read complete packet"
,
conn
);
uvHandleReq
(
conn
);
}
else
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录