Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
23b79e0d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
23b79e0d
编写于
5月 19, 2022
作者:
dengyihao
提交者:
GitHub
5月 19, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12708 from taosdata/enc/addUTtoCI
enh: add UT to CI
上级
6a65afb3
20e90670
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
192 addition
and
165 deletion
+192
-165
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+3
-3
source/libs/index/src/indexCache.c
source/libs/index/src/indexCache.c
+1
-1
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+16
-4
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+53
-53
source/libs/index/test/jsonUT.cc
source/libs/index/test/jsonUT.cc
+10
-10
source/libs/transport/test/CMakeLists.txt
source/libs/transport/test/CMakeLists.txt
+10
-0
source/libs/transport/test/transUT.cpp
source/libs/transport/test/transUT.cpp
+56
-49
source/libs/transport/test/transportTests.cpp
source/libs/transport/test/transportTests.cpp
+14
-6
source/libs/transport/test/uv.c
source/libs/transport/test/uv.c
+29
-39
未找到文件。
include/libs/transport/trpc.h
浏览文件 @
23b79e0d
...
...
@@ -38,7 +38,7 @@ typedef struct {
typedef
struct
SRpcHandleInfo
{
// rpc info
void
*
handle
;
// rpc handle returned to app
void
*
handle
;
// rpc handle returned to app
int64_t
refId
;
// refid, used by server
int32_t
noResp
;
// has response or not(default 0, 0: resp, 1: no resp);
int32_t
persistHandle
;
// persist handle or not
...
...
@@ -49,13 +49,13 @@ typedef struct SRpcHandleInfo {
void
*
node
;
// node mgmt handle
// resp info
void
*
rsp
;
void
*
rsp
;
int32_t
rspLen
;
}
SRpcHandleInfo
;
typedef
struct
SRpcMsg
{
tmsg_t
msgType
;
void
*
pCont
;
void
*
pCont
;
int32_t
contLen
;
int32_t
code
;
SRpcHandleInfo
info
;
...
...
source/libs/index/src/indexCache.c
浏览文件 @
23b79e0d
...
...
@@ -22,7 +22,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD
102
4 * 1024
#define MEM_THRESHOLD
6
4 * 1024
#define MEM_ESTIMATE_RADIO 1.5
static
void
indexMemRef
(
MemTable
*
tbl
);
...
...
source/libs/index/test/CMakeLists.txt
浏览文件 @
23b79e0d
...
...
@@ -92,7 +92,19 @@ target_link_libraries (jsonUT
index
)
#add_test(
# NAME index_test
# COMMAND indexTest
#)
add_test
(
NAME idxtest
COMMAND indexTest
)
add_test
(
NAME idxJsonUT
COMMAND jsonUT
)
add_test
(
NAME idxUtilUT
COMMAND UtilUT
)
add_test
(
NAME idxFstUT
COMMAND fstUT
)
source/libs/index/test/indexTests.cc
浏览文件 @
23b79e0d
...
...
@@ -714,7 +714,7 @@ class IndexObj {
return
numOfTable
;
}
int
ReadMultiMillonData
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
=
"Hello world"
,
size_t
numOfTable
=
100
*
10000
)
{
size_t
numOfTable
=
100
)
{
std
::
string
tColVal
=
colVal
;
int
colValSize
=
tColVal
.
size
();
...
...
@@ -896,7 +896,7 @@ TEST_F(IndexEnv2, testIndex_TrigeFlush) {
// r
std
::
cout
<<
"failed to init"
<<
std
::
endl
;
}
int
numOfTable
=
100
*
100
00
;
int
numOfTable
=
100
*
100
;
index
->
WriteMillonData
(
"tag1"
,
"Hello Wolrd"
,
numOfTable
);
int
target
=
index
->
SearchOne
(
"tag1"
,
"Hello Wolrd"
);
std
::
cout
<<
"Get Index: "
<<
target
<<
std
::
endl
;
...
...
@@ -910,8 +910,8 @@ static void single_write_and_search(IndexObj* idx) {
static
void
multi_write_and_search
(
IndexObj
*
idx
)
{
int
target
=
idx
->
SearchOne
(
"tag1"
,
"Hello"
);
target
=
idx
->
SearchOne
(
"tag2"
,
"Test"
);
idx
->
WriteMultiMillonData
(
"tag1"
,
"hello world test"
,
100
*
100
00
);
idx
->
WriteMultiMillonData
(
"tag2"
,
"world test nothing"
,
100
*
10
000
);
idx
->
WriteMultiMillonData
(
"tag1"
,
"hello world test"
,
100
*
100
);
idx
->
WriteMultiMillonData
(
"tag2"
,
"world test nothing"
,
100
*
10
);
}
TEST_F
(
IndexEnv2
,
testIndex_serarch_cache_and_tfile
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
...
...
@@ -920,8 +920,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
}
index
->
PutOne
(
"tag1"
,
"Hello"
);
index
->
PutOne
(
"tag2"
,
"Test"
);
index
->
WriteMultiMillonData
(
"tag1"
,
"Hello"
,
100
*
100
00
);
index
->
WriteMultiMillonData
(
"tag2"
,
"Test"
,
100
*
100
00
);
index
->
WriteMultiMillonData
(
"tag1"
,
"Hello"
,
100
*
100
);
index
->
WriteMultiMillonData
(
"tag2"
,
"Test"
,
100
*
100
);
std
::
thread
threads
[
NUM_OF_THREAD
];
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
...
...
@@ -949,49 +949,49 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
}
}
TEST_F
(
IndexEnv2
,
testIndex_restart
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
SearchOneTarget
(
"tag1"
,
"Hello"
,
10
);
index
->
SearchOneTarget
(
"tag2"
,
"Test"
,
10
);
}
TEST_F
(
IndexEnv2
,
testIndex_restart1
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
ReadMultiMillonData
(
"tag1"
,
"coding"
);
index
->
SearchOneTarget
(
"tag1"
,
"Hello"
,
10
);
index
->
SearchOneTarget
(
"tag2"
,
"Test"
,
10
);
}
//
TEST_F(IndexEnv2, testIndex_restart) {
//
std::string path = "/tmp/cache_and_tfile";
//
if (index->Init(path) != 0) {
//
}
//
index->SearchOneTarget("tag1", "Hello", 10);
//
index->SearchOneTarget("tag2", "Test", 10);
//
}
//
TEST_F(IndexEnv2, testIndex_restart1) {
//
std::string path = "/tmp/cache_and_tfile";
//
if (index->Init(path) != 0) {
//
}
//
index->ReadMultiMillonData("tag1", "coding");
//
index->SearchOneTarget("tag1", "Hello", 10);
//
index->SearchOneTarget("tag2", "Test", 10);
//
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag1"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndexMultiTag
)
{
std
::
string
path
=
"/tmp/multi_tag"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
int64_t
st
=
taosGetTimestampUs
();
int32_t
num
=
1000
*
10000
;
index
->
WriteMultiMillonData
(
"tag1"
,
"xxxxxxxxxxxxxxx"
,
num
);
std
::
cout
<<
"numOfRow: "
<<
num
<<
"
\t
time cost:"
<<
taosGetTimestampUs
()
-
st
<<
std
::
endl
;
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
}
//
TEST_F(IndexEnv2, testIndex_read_performance) {
//
std::string path = "/tmp/cache_and_tfile";
//
if (index->Init(path) != 0) {
//
}
//
index->PutOneTarge("tag1", "Hello", 12);
//
index->PutOneTarge("tag1", "Hello", 15);
//
index->ReadMultiMillonData("tag1", "Hello");
//
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
//
assert(3 == index->SearchOne("tag1", "Hello"));
//
}
//
TEST_F(IndexEnv2, testIndexMultiTag) {
//
std::string path = "/tmp/multi_tag";
//
if (index->Init(path) != 0) {
//
}
//
int64_t st = taosGetTimestampUs();
//
int32_t num = 1000 * 10000;
//
index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
//
std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
//
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
//
}
TEST_F
(
IndexEnv2
,
testLongComVal1
)
{
std
::
string
path
=
"/tmp/long_colVal"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
// gen colVal by randstr
std
::
string
randstr
=
"xxxxxxxxxxxxxxxxx"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
1000
0
);
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
1000
);
}
TEST_F
(
IndexEnv2
,
testLongComVal2
)
{
...
...
@@ -1000,7 +1000,7 @@ TEST_F(IndexEnv2, testLongComVal2) {
}
// gen colVal by randstr
std
::
string
randstr
=
"abcccc fdadfafdafda"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
1000
0
);
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
1000
);
}
TEST_F
(
IndexEnv2
,
testLongComVal3
)
{
std
::
string
path
=
"/tmp/long_colVal"
;
...
...
@@ -1008,7 +1008,7 @@ TEST_F(IndexEnv2, testLongComVal3) {
}
// gen colVal by randstr
std
::
string
randstr
=
"Yes, coding and coding and coding"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
1000
0
);
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
1000
);
}
TEST_F
(
IndexEnv2
,
testLongComVal4
)
{
std
::
string
path
=
"/tmp/long_colVal"
;
...
...
@@ -1016,7 +1016,7 @@ TEST_F(IndexEnv2, testLongComVal4) {
}
// gen colVal by randstr
std
::
string
randstr
=
"111111 bac fdadfa"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
100
00
);
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
100
);
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance1
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
...
...
@@ -1026,7 +1026,7 @@ TEST_F(IndexEnv2, testIndex_read_performance1) {
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
,
1000
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag1"
,
"Hello"
));
EXPECT_EQ
(
2
,
index
->
SearchOne
(
"tag1"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance2
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
...
...
@@ -1034,9 +1034,9 @@ TEST_F(IndexEnv2, testIndex_read_performance2) {
}
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
,
1000
*
10
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
,
1000
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag1"
,
"Hello"
));
EXPECT_EQ
(
2
,
index
->
SearchOne
(
"tag1"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance3
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
...
...
@@ -1044,9 +1044,9 @@ TEST_F(IndexEnv2, testIndex_read_performance3) {
}
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
,
1000
*
100
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
,
1000
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag1"
,
"Hello"
));
EXPECT_EQ
(
2
,
index
->
SearchOne
(
"tag1"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance4
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
...
...
@@ -1054,9 +1054,9 @@ TEST_F(IndexEnv2, testIndex_read_performance4) {
}
index
->
PutOneTarge
(
"tag10"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag12"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag10"
,
"Hello"
,
1000
*
100
);
index
->
ReadMultiMillonData
(
"tag10"
,
"Hello"
,
1000
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag10"
,
"Hello"
));
EXPECT_EQ
(
1
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_cache_del
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
...
...
@@ -1108,7 +1108,7 @@ TEST_F(IndexEnv2, testIndex_del) {
index
->
Del
(
"tag10"
,
"Hello"
,
11
);
EXPECT_EQ
(
98
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
index
->
WriteMultiMillonData
(
"tag10"
,
"xxxxxxxxxxxxxx"
,
100
*
100
00
);
index
->
WriteMultiMillonData
(
"tag10"
,
"xxxxxxxxxxxxxx"
,
100
*
100
);
index
->
Del
(
"tag10"
,
"Hello"
,
17
);
EXPECT_EQ
(
97
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
}
source/libs/index/test/jsonUT.cc
浏览文件 @
23b79e0d
...
...
@@ -154,7 +154,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
10
0
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
10
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
...
...
@@ -162,14 +162,14 @@ TEST_F(JsonEnv, testWriteMillonData) {
{
std
::
string
colName
(
"voltagefdadfa"
);
std
::
string
colVal
(
"abxxxxxxxxxxxx"
);
for
(
int
i
=
0
;
i
<
10
00
;
i
++
)
{
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
colVal
[
i
%
colVal
.
size
()]
=
'0'
+
i
%
128
;
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
100
0
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
100
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
...
...
@@ -199,7 +199,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
tIndexJsonSearch
(
index
,
mq
,
result
);
assert
(
100
==
taosArrayGetSize
(
result
));
EXPECT_EQ
(
10
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
{
...
...
@@ -229,7 +229,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
tIndexJsonSearch
(
index
,
mq
,
result
);
assert
(
100
==
taosArrayGetSize
(
result
));
EXPECT_EQ
(
10
,
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
}
...
...
@@ -385,7 +385,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
1000
00
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
1000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
...
...
@@ -523,7 +523,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT2) {
{
int
val
=
10
;
std
::
string
colName
(
"test1"
);
for
(
int
i
=
0
;
i
<
1000
0
;
i
++
)
{
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
val
+=
1
;
WriteData
(
index
,
colName
,
TSDB_DATA_TYPE_INT
,
&
val
,
sizeof
(
val
),
i
);
}
...
...
@@ -532,7 +532,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT2) {
int
val
=
10
;
std
::
string
colName
(
"test2xxx"
);
std
::
string
colVal
(
"xxxxxxxxxxxxxxx"
);
for
(
int
i
=
0
;
i
<
1000
00
;
i
++
)
{
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
val
+=
1
;
WriteData
(
index
,
colName
,
TSDB_DATA_TYPE_BINARY
,
(
void
*
)(
colVal
.
c_str
()),
colVal
.
size
(),
i
);
}
...
...
@@ -542,14 +542,14 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT2) {
std
::
string
colName
(
"test1"
);
int
val
=
9
;
Search
(
index
,
colName
,
TSDB_DATA_TYPE_INT
,
&
val
,
sizeof
(
val
),
QUERY_GREATER_EQUAL
,
&
res
);
EXPECT_EQ
(
1000
0
,
taosArrayGetSize
(
res
));
EXPECT_EQ
(
1000
,
taosArrayGetSize
(
res
));
}
{
SArray
*
res
=
NULL
;
std
::
string
colName
(
"test2xxx"
);
std
::
string
colVal
(
"xxxxxxxxxxxxxxx"
);
Search
(
index
,
colName
,
TSDB_DATA_TYPE_BINARY
,
(
void
*
)(
colVal
.
c_str
()),
colVal
.
size
(),
QUERY_TERM
,
&
res
);
EXPECT_EQ
(
1000
00
,
taosArrayGetSize
(
res
));
EXPECT_EQ
(
1000
,
taosArrayGetSize
(
res
));
}
}
TEST_F
(
JsonEnv
,
testWriteJsonTfileAndCache_FLOAT
)
{
...
...
source/libs/transport/test/CMakeLists.txt
浏览文件 @
23b79e0d
...
...
@@ -110,3 +110,13 @@ target_link_libraries (pushServer
transport
)
add_test
(
NAME transUT
COMMAND transUT
)
add_test
(
NAME transUtilUt
COMMAND transportTest
)
source/libs/transport/test/transUT.cpp
浏览文件 @
23b79e0d
...
...
@@ -43,6 +43,7 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
class
Client
{
public:
void
Init
(
int
nThread
)
{
memcpy
(
tsTempDir
,
"/tmp"
,
strlen
(
"/tmp"
));
memset
(
&
rpcInit_
,
0
,
sizeof
(
rpcInit_
));
rpcInit_
.
localPort
=
0
;
rpcInit_
.
label
=
(
char
*
)
label
;
...
...
@@ -107,7 +108,10 @@ class Client {
class
Server
{
public:
Server
()
{
memcpy
(
tsTempDir
,
"/tmp"
,
strlen
(
"/tmp"
));
memset
(
&
rpcInit_
,
0
,
sizeof
(
rpcInit_
));
memcpy
(
rpcInit_
.
localFqdn
,
"localhost"
,
strlen
(
"localhost"
));
rpcInit_
.
localPort
=
port
;
rpcInit_
.
label
=
(
char
*
)
label
;
rpcInit_
.
numOfThreads
=
5
;
...
...
@@ -300,12 +304,14 @@ TEST_F(TransEnv, 02StopServer) {
for
(
int
i
=
0
;
i
<
1
;
i
++
)
{
SRpcMsg
req
=
{
0
},
resp
=
{
0
};
req
.
msgType
=
0
;
req
.
info
.
ahandle
=
(
void
*
)
0x35
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
assert
(
resp
.
code
==
0
);
}
SRpcMsg
req
=
{
0
},
resp
=
{
0
};
req
.
info
.
ahandle
=
(
void
*
)
0x35
;
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
...
...
@@ -388,6 +394,7 @@ TEST_F(TransEnv, cliReleaseHandleExcept) {
memset
(
&
req
,
0
,
sizeof
(
req
));
req
.
info
=
resp
.
info
;
req
.
info
.
persistHandle
=
1
;
req
.
info
.
ahandle
=
(
void
*
)
1234
;
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
...
...
@@ -406,12 +413,12 @@ TEST_F(TransEnv, srvContinueSend) {
tr
->
SetSrvContinueSend
(
processContinueSend
);
SRpcMsg
req
=
{
0
},
resp
=
{
0
};
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
memset
(
&
req
,
0
,
sizeof
(
req
));
memset
(
&
resp
,
0
,
sizeof
(
resp
));
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
//
memset(&req, 0, sizeof(req));
//
memset(&resp, 0, sizeof(resp));
//
req.msgType = 1;
//
req.pCont = rpcMallocCont(10);
//
req.contLen = 10;
//
tr->cliSendAndRecv(&req, &resp);
}
taosMsleep
(
1000
);
}
...
...
@@ -422,16 +429,16 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
SRpcMsg
resp
=
{
0
};
SRpcMsg
req
=
{
0
};
for
(
int
i
=
0
;
i
<
5
;
i
++
)
{
memset
(
&
req
,
0
,
sizeof
(
req
));
req
.
info
=
resp
.
info
;
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
if
(
i
>
2
)
{
tr
->
StopCli
();
break
;
}
//
memset(&req, 0, sizeof(req));
//
req.info = resp.info;
//
req.msgType = 1;
//
req.pCont = rpcMallocCont(10);
//
req.contLen = 10;
//
tr->cliSendAndRecv(&req, &resp);
//
if (i > 2) {
//
tr->StopCli();
//
break;
//
}
}
taosMsleep
(
2000
);
// conn broken
...
...
@@ -442,16 +449,16 @@ TEST_F(TransEnv, cliPersistHandleExcept) {
SRpcMsg
resp
=
{
0
};
SRpcMsg
req
=
{
0
};
for
(
int
i
=
0
;
i
<
5
;
i
++
)
{
memset
(
&
req
,
0
,
sizeof
(
req
));
req
.
info
=
resp
.
info
;
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
if
(
i
>
2
)
{
tr
->
StopSrv
();
break
;
}
//
memset(&req, 0, sizeof(req));
//
req.info = resp.info;
//
req.msgType = 1;
//
req.pCont = rpcMallocCont(10);
//
req.contLen = 10;
//
tr->cliSendAndRecv(&req, &resp);
//
if (i > 2) {
//
tr->StopSrv();
//
break;
//
}
}
taosMsleep
(
2000
);
// conn broken
...
...
@@ -465,34 +472,34 @@ TEST_F(TransEnv, queryExcept) {
tr
->
SetSrvContinueSend
(
processRegisterFailure
);
SRpcMsg
resp
=
{
0
};
SRpcMsg
req
=
{
0
};
for
(
int
i
=
0
;
i
<
5
;
i
++
)
{
memset
(
&
req
,
0
,
sizeof
(
req
));
req
.
info
=
resp
.
info
;
req
.
info
.
persistHandle
=
1
;
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
if
(
i
==
2
)
{
rpcReleaseHandle
(
resp
.
info
.
handle
,
TAOS_CONN_CLIENT
);
tr
->
StopCli
();
break
;
}
}
//
for (int i = 0; i < 5; i++) {
//
memset(&req, 0, sizeof(req));
//
req.info = resp.info;
//
req.info.persistHandle = 1;
//
req.msgType = 1;
//
req.pCont = rpcMallocCont(10);
//
req.contLen = 10;
//
tr->cliSendAndRecv(&req, &resp);
//
if (i == 2) {
//
rpcReleaseHandle(resp.info.handle, TAOS_CONN_CLIENT);
//
tr->StopCli();
//
break;
//
}
//
}
taosMsleep
(
4
*
1000
);
}
TEST_F
(
TransEnv
,
noResp
)
{
SRpcMsg
resp
=
{
0
};
SRpcMsg
req
=
{
0
};
for
(
int
i
=
0
;
i
<
5
;
i
++
)
{
memset
(
&
req
,
0
,
sizeof
(
req
));
req
.
info
.
noResp
=
1
;
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
}
taosMsleep
(
2000
);
//
for (int i = 0; i < 5; i++) {
//
memset(&req, 0, sizeof(req));
//
req.info.noResp = 1;
//
req.msgType = 1;
//
req.pCont = rpcMallocCont(10);
//
req.contLen = 10;
//
tr->cliSendAndRecv(&req, &resp);
//
}
//
taosMsleep(2000);
// no resp
}
source/libs/transport/test/transportTests.cpp
浏览文件 @
23b79e0d
...
...
@@ -150,20 +150,26 @@ class TransCtxEnv : public ::testing::Test {
STransCtx
*
ctx
;
};
int32_t
cloneVal
(
void
*
src
,
void
**
dst
)
{
int
sz
=
(
int
)
strlen
((
char
*
)
src
);
*
dst
=
taosMemoryCalloc
(
1
,
sz
+
1
);
memcpy
(
*
dst
,
src
,
sz
);
return
0
;
}
TEST_F
(
TransCtxEnv
,
mergeTest
)
{
int
key
=
1
;
{
STransCtx
*
src
=
(
STransCtx
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STransCtx
));
transCtxInit
(
src
);
{
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
val1
.
val
=
taosMemoryMalloc
(
12
);
taosHashPut
(
src
->
args
,
&
key
,
sizeof
(
key
),
&
val1
,
sizeof
(
val1
));
key
++
;
}
{
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
val1
.
val
=
taosMemoryMalloc
(
12
);
taosHashPut
(
src
->
args
,
&
key
,
sizeof
(
key
),
&
val1
,
sizeof
(
val1
));
key
++
;
...
...
@@ -176,14 +182,14 @@ TEST_F(TransCtxEnv, mergeTest) {
STransCtx
*
src
=
(
STransCtx
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STransCtx
));
transCtxInit
(
src
);
{
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
val1
.
val
=
taosMemoryMalloc
(
12
);
taosHashPut
(
src
->
args
,
&
key
,
sizeof
(
key
),
&
val1
,
sizeof
(
val1
));
key
++
;
}
{
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
val1
.
val
=
taosMemoryMalloc
(
12
);
taosHashPut
(
src
->
args
,
&
key
,
sizeof
(
key
),
&
val1
,
sizeof
(
val1
));
key
++
;
...
...
@@ -198,16 +204,18 @@ TEST_F(TransCtxEnv, mergeTest) {
STransCtx
*
src
=
(
STransCtx
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STransCtx
));
transCtxInit
(
src
);
{
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
val1
.
val
=
taosMemoryCalloc
(
1
,
11
);
val1
.
clone
=
cloneVal
;
memcpy
(
val1
.
val
,
val
.
c_str
(),
val
.
size
());
taosHashPut
(
src
->
args
,
&
key
,
sizeof
(
key
),
&
val1
,
sizeof
(
val1
));
key
++
;
}
{
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
STransCtxVal
val1
=
{
NULL
,
NULL
,
(
void
(
*
)(
const
void
*
))
taosMemoryFree
};
val1
.
val
=
taosMemoryCalloc
(
1
,
11
);
val1
.
clone
=
cloneVal
;
memcpy
(
val1
.
val
,
val
.
c_str
(),
val
.
size
());
taosHashPut
(
src
->
args
,
&
key
,
sizeof
(
key
),
&
val1
,
sizeof
(
val1
));
key
++
;
...
...
source/libs/transport/test/uv.c
浏览文件 @
23b79e0d
#include <assert.h>
#include <uv.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#include "task.h"
#define NUM_OF_THREAD 1
#define TIMEOUT 10000
#define TIMEOUT
10000
typedef
struct
SThreadObj
{
TdThread
thread
;
uv_pipe_t
*
pipe
;
uv_loop_t
*
loop
;
uv_async_t
*
workerAsync
;
//
int
fd
;
TdThread
thread
;
uv_pipe_t
*
pipe
;
uv_loop_t
*
loop
;
uv_async_t
*
workerAsync
;
//
int
fd
;
}
SThreadObj
;
typedef
struct
SServerObj
{
uv_tcp_t
server
;
uv_loop_t
*
loop
;
int
workerIdx
;
int
numOfThread
;
uv_tcp_t
server
;
uv_loop_t
*
loop
;
int
workerIdx
;
int
numOfThread
;
SThreadObj
**
pThreadObj
;
uv_pipe_t
**
pipe
;
uv_pipe_t
**
pipe
;
}
SServerObj
;
typedef
struct
SConnCtx
{
uv_tcp_t
*
pClient
;
uv_tcp_t
*
pClient
;
uv_timer_t
*
pTimer
;
uv_async_t
*
pWorkerAsync
;
int
ref
;
int
ref
;
}
SConnCtx
;
void
echo_write
(
uv_write_t
*
req
,
int
status
)
{
...
...
@@ -42,7 +42,6 @@ void echo_write(uv_write_t *req, int status) {
}
void
echo_read
(
uv_stream_t
*
client
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
SConnCtx
*
pConn
=
container_of
(
client
,
SConnCtx
,
pClient
);
pConn
->
ref
+=
1
;
printf
(
"read data %d
\n
"
,
nread
,
buf
->
base
,
buf
->
len
);
...
...
@@ -59,8 +58,7 @@ void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
if
(
nread
<
0
)
{
if
(
nread
!=
UV_EOF
)
fprintf
(
stderr
,
"Read error %s
\n
"
,
uv_err_name
(
nread
));
if
(
nread
!=
UV_EOF
)
fprintf
(
stderr
,
"Read error %s
\n
"
,
uv_err_name
(
nread
));
uv_close
((
uv_handle_t
*
)
client
,
NULL
);
}
taosMemoryFree
(
buf
->
base
);
...
...
@@ -83,21 +81,19 @@ void on_new_connection(uv_stream_t *s, int status) {
uv_tcp_init
(
pObj
->
loop
,
client
);
if
(
uv_accept
(
s
,
(
uv_stream_t
*
)
client
)
==
0
)
{
uv_write_t
*
write_req
=
(
uv_write_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_write_t
));
uv_buf_t
dummy_buf
=
uv_buf_init
(
"a"
,
1
);
uv_buf_t
dummy_buf
=
uv_buf_init
(
"a"
,
1
);
// despatch to worker thread
pObj
->
workerIdx
=
(
pObj
->
workerIdx
+
1
)
%
pObj
->
numOfThread
;
uv_write2
(
write_req
,
(
uv_stream_t
*
)
&
(
pObj
->
pipe
[
pObj
->
workerIdx
][
0
]),
&
dummy_buf
,
1
,
(
uv_stream_t
*
)
client
,
echo_write
);
uv_write2
(
write_req
,
(
uv_stream_t
*
)
&
(
pObj
->
pipe
[
pObj
->
workerIdx
][
0
]),
&
dummy_buf
,
1
,
(
uv_stream_t
*
)
client
,
echo_write
);
}
else
{
uv_close
((
uv_handle_t
*
)
client
,
NULL
);
}
}
void
child_on_new_connection
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
void
child_on_new_connection
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
printf
(
"x child_on_new_connection
\n
"
);
if
(
nread
<
0
)
{
if
(
nread
!=
UV_EOF
)
fprintf
(
stderr
,
"Read error %s
\n
"
,
uv_err_name
(
nread
));
if
(
nread
!=
UV_EOF
)
fprintf
(
stderr
,
"Read error %s
\n
"
,
uv_err_name
(
nread
));
uv_close
((
uv_handle_t
*
)
q
,
NULL
);
return
;
}
...
...
@@ -119,7 +115,7 @@ void child_on_new_connection(uv_stream_t *q, ssize_t nread,
uv_timer_init
(
pObj
->
loop
,
pConn
->
pTimer
);
pConn
->
pClient
=
(
uv_tcp_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_tcp_t
));
pConn
->
pWorkerAsync
=
pObj
->
workerAsync
;
// thread safty
pConn
->
pWorkerAsync
=
pObj
->
workerAsync
;
// thread safty
uv_tcp_init
(
pObj
->
loop
,
pConn
->
pClient
);
if
(
uv_accept
(
q
,
(
uv_stream_t
*
)(
pConn
->
pClient
))
==
0
)
{
...
...
@@ -143,7 +139,7 @@ static void workerAsyncCallback(uv_async_t *handle) {
}
void
*
worker_thread
(
void
*
arg
)
{
SThreadObj
*
pObj
=
(
SThreadObj
*
)
arg
;
int
fd
=
pObj
->
fd
;
int
fd
=
pObj
->
fd
;
pObj
->
loop
=
(
uv_loop_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pObj
->
loop
);
...
...
@@ -152,19 +148,16 @@ void *worker_thread(void *arg) {
pObj
->
workerAsync
=
taosMemoryMalloc
(
sizeof
(
uv_async_t
));
uv_async_init
(
pObj
->
loop
,
pObj
->
workerAsync
,
workerAsyncCallback
);
uv_read_start
((
uv_stream_t
*
)
pObj
->
pipe
,
alloc_buffer
,
child_on_new_connection
);
uv_read_start
((
uv_stream_t
*
)
pObj
->
pipe
,
alloc_buffer
,
child_on_new_connection
);
uv_run
(
pObj
->
loop
,
UV_RUN_DEFAULT
);
}
int
main
()
{
SServerObj
*
server
=
taosMemoryCalloc
(
1
,
sizeof
(
SServerObj
));
server
->
loop
=
(
uv_loop_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
server
->
numOfThread
=
NUM_OF_THREAD
;
server
->
workerIdx
=
0
;
server
->
pThreadObj
=
(
SThreadObj
**
)
taosMemoryCalloc
(
server
->
numOfThread
,
sizeof
(
SThreadObj
*
));
server
->
pThreadObj
=
(
SThreadObj
**
)
taosMemoryCalloc
(
server
->
numOfThread
,
sizeof
(
SThreadObj
*
));
server
->
pipe
=
(
uv_pipe_t
**
)
taosMemoryCalloc
(
server
->
numOfThread
,
sizeof
(
uv_pipe_t
*
));
uv_loop_init
(
server
->
loop
);
...
...
@@ -173,17 +166,15 @@ int main() {
server
->
pThreadObj
[
i
]
=
(
SThreadObj
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SThreadObj
));
server
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
taosMemoryCalloc
(
2
,
sizeof
(
uv_pipe_t
));
int
fds
[
2
];
if
(
uv_socketpair
(
AF_UNIX
,
SOCK_STREAM
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
if
(
uv_socketpair
(
AF_UNIX
,
SOCK_STREAM
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
return
-
1
;
}
uv_pipe_init
(
server
->
loop
,
&
(
server
->
pipe
[
i
][
0
]),
1
);
uv_pipe_open
(
&
(
server
->
pipe
[
i
][
0
]),
fds
[
1
]);
// init write
uv_pipe_open
(
&
(
server
->
pipe
[
i
][
0
]),
fds
[
1
]);
// init write
server
->
pThreadObj
[
i
]
->
fd
=
fds
[
0
];
server
->
pThreadObj
[
i
]
->
pipe
=
&
(
server
->
pipe
[
i
][
1
]);
// init read
int
err
=
taosThreadCreate
(
&
(
server
->
pThreadObj
[
i
]
->
thread
),
NULL
,
worker_thread
,
(
void
*
)(
server
->
pThreadObj
[
i
]));
server
->
pThreadObj
[
i
]
->
pipe
=
&
(
server
->
pipe
[
i
][
1
]);
// init read
int
err
=
taosThreadCreate
(
&
(
server
->
pThreadObj
[
i
]
->
thread
),
NULL
,
worker_thread
,
(
void
*
)(
server
->
pThreadObj
[
i
]));
if
(
err
==
0
)
{
printf
(
"thread %d create
\n
"
,
i
);
}
else
{
...
...
@@ -195,8 +186,7 @@ int main() {
uv_ip4_addr
(
"0.0.0.0"
,
7000
,
&
bind_addr
);
uv_tcp_bind
(
&
server
->
server
,
(
const
struct
sockaddr
*
)
&
bind_addr
,
0
);
int
err
=
0
;
if
((
err
=
uv_listen
((
uv_stream_t
*
)
&
server
->
server
,
128
,
on_new_connection
))
!=
0
)
{
if
((
err
=
uv_listen
((
uv_stream_t
*
)
&
server
->
server
,
128
,
on_new_connection
))
!=
0
)
{
fprintf
(
stderr
,
"Listen error %s
\n
"
,
uv_err_name
(
err
));
return
2
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录