Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9cf07ff8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9cf07ff8
编写于
2月 21, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add more UT test
上级
507573d5
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
184 addition
and
50 deletion
+184
-50
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+0
-1
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+16
-3
source/libs/transport/test/transUT.cc
source/libs/transport/test/transUT.cc
+168
-46
未找到文件。
source/libs/transport/src/transCli.c
浏览文件 @
9cf07ff8
...
@@ -350,7 +350,6 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
...
@@ -350,7 +350,6 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if
(
nread
>
0
)
{
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
pBuf
->
len
+=
nread
;
if
(
clientReadComplete
(
pBuf
))
{
if
(
clientReadComplete
(
pBuf
))
{
// uv_read_stop((uv_stream_t*)conn->stream);
tTrace
(
"client conn %p read complete"
,
conn
);
tTrace
(
"client conn %p read complete"
,
conn
);
clientHandleResp
(
conn
);
clientHandleResp
(
conn
);
}
else
{
}
else
{
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
9cf07ff8
...
@@ -96,6 +96,7 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status);
...
@@ -96,6 +96,7 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status);
static
void
uvOnConnectionCb
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
static
void
uvOnConnectionCb
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
static
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
);
static
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
);
static
void
uvAcceptAsyncCb
(
uv_async_t
*
handle
);
static
void
uvAcceptAsyncCb
(
uv_async_t
*
handle
);
static
void
uvShutDownCb
(
uv_shutdown_t
*
req
,
int
status
);
static
void
uvStartSendRespInternal
(
SSrvMsg
*
smsg
);
static
void
uvStartSendRespInternal
(
SSrvMsg
*
smsg
);
static
void
uvPrepareSendData
(
SSrvMsg
*
msg
,
uv_buf_t
*
wb
);
static
void
uvPrepareSendData
(
SSrvMsg
*
msg
,
uv_buf_t
*
wb
);
...
@@ -446,6 +447,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
...
@@ -446,6 +447,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
free
(
msg
);
free
(
msg
);
destroyAllConn
(
pThrd
);
destroyAllConn
(
pThrd
);
uv_loop_close
(
pThrd
->
loop
);
uv_stop
(
pThrd
->
loop
);
uv_stop
(
pThrd
->
loop
);
}
else
{
}
else
{
uvStartSendResp
(
msg
);
uvStartSendResp
(
msg
);
...
@@ -463,6 +466,12 @@ static void uvAcceptAsyncCb(uv_async_t* async) {
...
@@ -463,6 +466,12 @@ static void uvAcceptAsyncCb(uv_async_t* async) {
uv_stop
(
srv
->
loop
);
uv_stop
(
srv
->
loop
);
}
}
static
void
uvShutDownCb
(
uv_shutdown_t
*
req
,
int
status
)
{
tDebug
(
"conn failed to shut down: %s"
,
uv_err_name
(
status
));
uv_close
((
uv_handle_t
*
)
req
->
handle
,
uvDestroyConn
);
free
(
req
);
}
void
uvOnAcceptCb
(
uv_stream_t
*
stream
,
int
status
)
{
void
uvOnAcceptCb
(
uv_stream_t
*
stream
,
int
status
)
{
if
(
status
==
-
1
)
{
if
(
status
==
-
1
)
{
return
;
return
;
...
@@ -528,8 +537,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
...
@@ -528,8 +537,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_tcp_init
(
pThrd
->
loop
,
pConn
->
pTcp
);
uv_tcp_init
(
pThrd
->
loop
,
pConn
->
pTcp
);
pConn
->
pTcp
->
data
=
pConn
;
pConn
->
pTcp
->
data
=
pConn
;
uv_tcp_nodelay
(
pConn
->
pTcp
,
1
);
//
uv_tcp_nodelay(pConn->pTcp, 1);
uv_tcp_keepalive
(
pConn
->
pTcp
,
1
,
1
);
//
uv_tcp_keepalive(pConn->pTcp, 1, 1);
// init write request, just
// init write request, just
pConn
->
pWriter
=
calloc
(
1
,
sizeof
(
uv_write_t
));
pConn
->
pWriter
=
calloc
(
1
,
sizeof
(
uv_write_t
));
...
@@ -656,7 +665,11 @@ static void destroyConn(SSrvConn* conn, bool clear) {
...
@@ -656,7 +665,11 @@ static void destroyConn(SSrvConn* conn, bool clear) {
QUEUE_REMOVE
(
&
conn
->
queue
);
QUEUE_REMOVE
(
&
conn
->
queue
);
if
(
clear
)
{
if
(
clear
)
{
tTrace
(
"try to destroy conn %p"
,
conn
);
uv_tcp_close_reset
(
conn
->
pTcp
,
uvDestroyConn
);
uv_tcp_close_reset
(
conn
->
pTcp
,
uvDestroyConn
);
// uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
// uv_unref((uv_handle_t*)conn->pTcp);
// uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
// uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
}
}
}
}
...
@@ -665,7 +678,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
...
@@ -665,7 +678,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
tDebug
(
"server conn %p destroy"
,
conn
);
tDebug
(
"server conn %p destroy"
,
conn
);
uv_timer_stop
(
conn
->
pTimer
);
uv_timer_stop
(
conn
->
pTimer
);
// free(conn->pTimer);
// free(conn->pTimer);
//
free(conn->pTcp);
free
(
conn
->
pTcp
);
free
(
conn
->
pWriter
);
free
(
conn
->
pWriter
);
free
(
conn
);
free
(
conn
);
}
}
...
...
source/libs/transport/test/transUT.cc
浏览文件 @
9cf07ff8
...
@@ -16,69 +16,168 @@
...
@@ -16,69 +16,168 @@
#include <cstdio>
#include <cstdio>
#include <cstring>
#include <cstring>
#include "tep.h"
#include "tep.h"
#include "tglobal.h"
#include "trpc.h"
#include "trpc.h"
#include "ulog.h"
using
namespace
std
;
using
namespace
std
;
class
TransObj
{
const
char
*
label
=
"APP"
;
public:
const
char
*
secret
=
"secret"
;
TransObj
()
{
const
char
*
user
=
"user"
;
const
char
*
label
=
"APP"
;
const
char
*
ckey
=
"ckey"
;
const
char
*
secret
=
"secret"
;
const
char
*
user
=
"user"
;
const
char
*
ckey
=
"ckey"
;
class
Server
;
int
port
=
7000
;
// server process
static
void
processReq
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
// client process;
static
void
processResp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
class
Client
{
public:
void
Init
(
int
nThread
)
{
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
(
char
*
)
label
;
rpcInit
.
label
=
(
char
*
)
label
;
rpcInit
.
numOfThreads
=
5
;
rpcInit
.
numOfThreads
=
nThread
;
rpcInit
.
cfp
=
NULL
;
rpcInit
.
cfp
=
processResp
;
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
secret
=
(
char
*
)
secret
;
rpcInit
.
secret
=
(
char
*
)
secret
;
rpcInit
.
ckey
=
(
char
*
)
ckey
;
rpcInit
.
ckey
=
(
char
*
)
ckey
;
rpcInit
.
spi
=
1
;
rpcInit
.
spi
=
1
;
}
rpcInit
.
parent
=
this
;
bool
startCli
()
{
trans
=
NULL
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
t
rans
=
rpcOpen
(
&
rpcInit
);
t
his
->
transCli
=
rpcOpen
(
&
rpcInit
);
return
trans
!=
NULL
?
true
:
false
;
tsem_init
(
&
this
->
sem
,
0
,
0
)
;
}
}
bool
startSrv
()
{
void
SetResp
(
SRpcMsg
*
pMsg
)
{
trans
=
NULL
;
// set up resp;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
this
->
resp
=
*
pMsg
;
trans
=
rpcOpen
(
&
rpcInit
);
}
return
trans
!=
NULL
?
true
:
false
;
SRpcMsg
*
Resp
()
{
return
&
this
->
resp
;
}
void
Restart
()
{
rpcClose
(
this
->
transCli
);
this
->
transCli
=
rpcOpen
(
&
rpcInit
);
}
}
bool
sendAndRecv
(
)
{
void
SendAndRecv
(
SRpcMsg
*
req
,
SRpcMsg
*
resp
)
{
SEpSet
epSet
=
{
0
};
SEpSet
epSet
=
{
0
};
epSet
.
inUse
=
0
;
epSet
.
inUse
=
0
;
addEpIntoEpSet
(
&
epSet
,
"192.168.1.1"
,
7000
);
addEpIntoEpSet
(
&
epSet
,
"127.0.0.1"
,
7000
);
addEpIntoEpSet
(
&
epSet
,
"192.168.0.1"
,
7000
);
if
(
trans
==
NULL
)
{
rpcSendRequest
(
this
->
transCli
,
&
epSet
,
req
,
NULL
);
return
false
;
SemWait
();
}
*
resp
=
this
->
resp
;
SRpcMsg
rpcMsg
=
{
0
},
reqMsg
=
{
0
};
reqMsg
.
pCont
=
rpcMallocCont
(
10
);
reqMsg
.
contLen
=
10
;
reqMsg
.
ahandle
=
NULL
;
rpcSendRecv
(
trans
,
&
epSet
,
&
reqMsg
,
&
rpcMsg
);
int
code
=
rpcMsg
.
code
;
std
::
cout
<<
tstrerror
(
code
)
<<
std
::
endl
;
return
true
;
}
}
bool
stop
()
{
void
SemWait
()
{
tsem_wait
(
&
this
->
sem
);
}
rpcClose
(
trans
);
void
SemPost
()
{
tsem_post
(
&
this
->
sem
);
}
trans
=
NULL
;
void
Reset
()
{}
return
true
;
~
Client
()
{
if
(
this
->
transCli
)
rpcClose
(
this
->
transCli
);
}
}
private:
private:
void
*
trans
;
tsem_t
sem
;
SRpcInit
rpcInit
;
SRpcInit
rpcInit
;
void
*
transCli
;
SRpcMsg
resp
;
};
class
Server
{
public:
Server
()
{
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
port
;
rpcInit
.
label
=
(
char
*
)
label
;
rpcInit
.
numOfThreads
=
5
;
rpcInit
.
cfp
=
processReq
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
secret
=
(
char
*
)
secret
;
rpcInit
.
ckey
=
(
char
*
)
ckey
;
rpcInit
.
spi
=
1
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
}
void
Start
()
{
this
->
transSrv
=
rpcOpen
(
&
this
->
rpcInit
);
taosMsleep
(
1000
);
}
void
Stop
()
{
if
(
this
->
transSrv
==
NULL
)
return
;
rpcClose
(
this
->
transSrv
);
this
->
transSrv
=
NULL
;
}
void
Restart
()
{
this
->
Stop
();
this
->
Start
();
}
~
Server
()
{
if
(
this
->
transSrv
)
rpcClose
(
this
->
transSrv
);
this
->
transSrv
=
NULL
;
}
private:
SRpcInit
rpcInit
;
void
*
transSrv
;
};
static
void
processReq
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
rpcMallocCont
(
100
);
rpcMsg
.
contLen
=
100
;
rpcMsg
.
handle
=
pMsg
->
handle
;
rpcMsg
.
code
=
0
;
rpcSendResponse
(
&
rpcMsg
);
}
// client process;
static
void
processResp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
Client
*
client
=
(
Client
*
)
parent
;
client
->
SetResp
(
pMsg
);
client
->
SemPost
();
}
class
TransObj
{
public:
TransObj
()
{
dDebugFlag
=
143
;
vDebugFlag
=
0
;
mDebugFlag
=
143
;
cDebugFlag
=
0
;
jniDebugFlag
=
0
;
tmrDebugFlag
=
143
;
uDebugFlag
=
143
;
rpcDebugFlag
=
143
;
qDebugFlag
=
0
;
wDebugFlag
=
0
;
sDebugFlag
=
0
;
tsdbDebugFlag
=
0
;
cqDebugFlag
=
0
;
tscEmbeddedInUtil
=
1
;
tsAsyncLog
=
0
;
std
::
string
path
=
"/tmp/transport"
;
taosRemoveDir
(
path
.
c_str
());
taosMkDir
(
path
.
c_str
());
char
temp
[
PATH_MAX
];
snprintf
(
temp
,
PATH_MAX
,
"%s/taosdlog"
,
path
.
c_str
());
if
(
taosInitLog
(
temp
,
tsNumOfLogLines
,
1
)
!=
0
)
{
printf
(
"failed to init log file
\n
"
);
}
cli
=
new
Client
;
cli
->
Init
(
1
);
srv
=
new
Server
;
srv
->
Start
();
}
void
RestartCli
()
{
cli
->
Restart
();
}
void
StopSrv
()
{
srv
->
Stop
();
}
void
RestartSrv
()
{
srv
->
Restart
();
}
void
cliSendAndRecv
(
SRpcMsg
*
req
,
SRpcMsg
*
resp
)
{
cli
->
SendAndRecv
(
req
,
resp
);
}
~
TransObj
()
{
delete
cli
;
delete
srv
;
}
private:
Client
*
cli
;
Server
*
srv
;
};
};
class
TransEnv
:
public
::
testing
::
Test
{
class
TransEnv
:
public
::
testing
::
Test
{
protected:
protected:
...
@@ -93,11 +192,34 @@ class TransEnv : public ::testing::Test {
...
@@ -93,11 +192,34 @@ class TransEnv : public ::testing::Test {
TransObj
*
tr
=
NULL
;
TransObj
*
tr
=
NULL
;
};
};
TEST_F
(
TransEnv
,
test_start_stop
)
{
assert
(
tr
->
startCli
());
assert
(
tr
->
sendAndRecv
());
assert
(
tr
->
stop
());
assert
(
tr
->
startSrv
());
// TEST_F(TransEnv, 01sendAndRec) {
assert
(
tr
->
stop
());
// for (int i = 0; i < 1; i++) {
// SRpcMsg req = {0}, resp = {0};
// req.msgType = 0;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
// assert(resp.code == 0);
// }
//}
TEST_F
(
TransEnv
,
02
StopServer
)
{
for
(
int
i
=
0
;
i
<
1
;
i
++
)
{
SRpcMsg
req
=
{
0
},
resp
=
{
0
};
req
.
msgType
=
0
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
assert
(
resp
.
code
==
0
);
}
SRpcMsg
req
=
{
0
},
resp
=
{
0
};
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
StopSrv
();
// tr->RestartSrv();
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
assert
(
resp
.
code
!=
0
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录