Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eb2eb6a5
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
eb2eb6a5
编写于
1月 20, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj
上级
41fb517f
0711ed80
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
223 addition
and
30 deletion
+223
-30
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+102
-12
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+1
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+107
-11
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+11
-7
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+2
-0
未找到文件。
source/libs/index/test/indexTests.cc
浏览文件 @
eb2eb6a5
...
@@ -82,7 +82,9 @@ class FstReadMemory {
...
@@ -82,7 +82,9 @@ class FstReadMemory {
bool
init
()
{
bool
init
()
{
char
*
buf
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
_size
);
char
*
buf
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
_size
);
int
nRead
=
fstCountingWriterRead
(
_w
,
(
uint8_t
*
)
buf
,
_size
);
int
nRead
=
fstCountingWriterRead
(
_w
,
(
uint8_t
*
)
buf
,
_size
);
if
(
nRead
<=
0
)
{
return
false
;
}
if
(
nRead
<=
0
)
{
return
false
;
}
_size
=
nRead
;
_size
=
nRead
;
_s
=
fstSliceCreate
((
uint8_t
*
)
buf
,
_size
);
_s
=
fstSliceCreate
((
uint8_t
*
)
buf
,
_size
);
_fst
=
fstCreate
(
&
_s
);
_fst
=
fstCreate
(
&
_s
);
...
@@ -108,7 +110,9 @@ class FstReadMemory {
...
@@ -108,7 +110,9 @@ class FstReadMemory {
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
result
.
push_back
((
uint64_t
)(
rt
->
out
.
out
));
}
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
result
.
push_back
((
uint64_t
)(
rt
->
out
.
out
));
}
return
true
;
return
true
;
}
}
bool
SearchWithTimeCostUs
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>&
result
)
{
bool
SearchWithTimeCostUs
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>&
result
)
{
...
@@ -184,7 +188,9 @@ void checkFstPerf() {
...
@@ -184,7 +188,9 @@ void checkFstPerf() {
delete
fw
;
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
())
{
printf
(
"success to init fst read"
);
}
if
(
m
->
init
())
{
printf
(
"success to init fst read"
);
}
Performance_fstReadRecords
(
m
);
Performance_fstReadRecords
(
m
);
delete
m
;
delete
m
;
}
}
...
@@ -348,7 +354,9 @@ class TFileObj {
...
@@ -348,7 +354,9 @@ class TFileObj {
tfileReaderDestroy
(
reader_
);
tfileReaderDestroy
(
reader_
);
reader_
=
NULL
;
reader_
=
NULL
;
}
}
if
(
writer_
==
NULL
)
{
InitWriter
();
}
if
(
writer_
==
NULL
)
{
InitWriter
();
}
return
tfileWriterPut
(
writer_
,
tv
,
false
);
return
tfileWriterPut
(
writer_
,
tv
,
false
);
}
}
bool
InitWriter
()
{
bool
InitWriter
()
{
...
@@ -388,8 +396,12 @@ class TFileObj {
...
@@ -388,8 +396,12 @@ class TFileObj {
return
tfileReaderSearch
(
reader_
,
query
,
result
);
return
tfileReaderSearch
(
reader_
,
query
,
result
);
}
}
~
TFileObj
()
{
~
TFileObj
()
{
if
(
writer_
)
{
tfileWriterDestroy
(
writer_
);
}
if
(
writer_
)
{
if
(
reader_
)
{
tfileReaderDestroy
(
reader_
);
}
tfileWriterDestroy
(
writer_
);
}
if
(
reader_
)
{
tfileReaderDestroy
(
reader_
);
}
}
}
private:
private:
...
@@ -912,7 +924,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
...
@@ -912,7 +924,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
}
}
TEST_F
(
IndexEnv2
,
testIndex_MultiWrite_and_MultiRead
)
{
TEST_F
(
IndexEnv2
,
testIndex_MultiWrite_and_MultiRead
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
if
(
index
->
Init
(
path
)
!=
0
)
{
}
std
::
thread
threads
[
NUM_OF_THREAD
];
std
::
thread
threads
[
NUM_OF_THREAD
];
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
...
@@ -927,14 +940,24 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
...
@@ -927,14 +940,24 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
TEST_F
(
IndexEnv2
,
testIndex_restart
)
{
TEST_F
(
IndexEnv2
,
testIndex_restart
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
SearchOneTarget
(
"tag1"
,
"Hello"
,
10
);
index
->
SearchOneTarget
(
"tag2"
,
"Test"
,
10
);
}
TEST_F
(
IndexEnv2
,
testIndex_restart1
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
ReadMultiMillonData
(
"tag1"
,
"coding"
);
index
->
SearchOneTarget
(
"tag1"
,
"Hello"
,
10
);
index
->
SearchOneTarget
(
"tag1"
,
"Hello"
,
10
);
index
->
SearchOneTarget
(
"tag2"
,
"Test"
,
10
);
index
->
SearchOneTarget
(
"tag2"
,
"Test"
,
10
);
}
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance
)
{
TEST_F
(
IndexEnv2
,
testIndex_read_performance
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
);
...
@@ -943,17 +966,84 @@ TEST_F(IndexEnv2, testIndex_read_performance) {
...
@@ -943,17 +966,84 @@ TEST_F(IndexEnv2, testIndex_read_performance) {
}
}
TEST_F
(
IndexEnv2
,
testIndexMultiTag
)
{
TEST_F
(
IndexEnv2
,
testIndexMultiTag
)
{
std
::
string
path
=
"/tmp/multi_tag"
;
std
::
string
path
=
"/tmp/multi_tag"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
if
(
index
->
Init
(
path
)
!=
0
)
{
}
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
int32_t
num
=
1000
*
10000
;
int32_t
num
=
1000
*
10000
;
index
->
WriteMultiMillonData
(
"tag1"
,
"xxxxxxxxxxxxxxx"
,
num
);
index
->
WriteMultiMillonData
(
"tag1"
,
"xxxxxxxxxxxxxxx"
,
num
);
std
::
cout
<<
"numOfRow: "
<<
num
<<
"
\t
time cost:"
<<
taosGetTimestampUs
()
-
st
<<
std
::
endl
;
std
::
cout
<<
"numOfRow: "
<<
num
<<
"
\t
time cost:"
<<
taosGetTimestampUs
()
-
st
<<
std
::
endl
;
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
}
}
TEST_F
(
IndexEnv2
,
testLongComVal
)
{
TEST_F
(
IndexEnv2
,
testLongComVal
1
)
{
std
::
string
path
=
"/tmp/long_colVal"
;
std
::
string
path
=
"/tmp/long_colVal"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
if
(
index
->
Init
(
path
)
!=
0
)
{
}
// gen colVal by randstr
// gen colVal by randstr
std
::
string
randstr
=
"xxxxxxxxxxxxxxxxx"
;
std
::
string
randstr
=
"xxxxxxxxxxxxxxxxx"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
10000
);
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
10000
);
}
}
TEST_F
(
IndexEnv2
,
testLongComVal2
)
{
std
::
string
path
=
"/tmp/long_colVal"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
// gen colVal by randstr
std
::
string
randstr
=
"abcccc fdadfafdafda"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
10000
);
}
TEST_F
(
IndexEnv2
,
testLongComVal3
)
{
std
::
string
path
=
"/tmp/long_colVal"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
// gen colVal by randstr
std
::
string
randstr
=
"Yes, coding and coding and coding"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
10000
);
}
TEST_F
(
IndexEnv2
,
testLongComVal4
)
{
std
::
string
path
=
"/tmp/long_colVal"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
// gen colVal by randstr
std
::
string
randstr
=
"111111 bac fdadfa"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
10000
);
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance1
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
,
1000
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag1"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance2
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
,
1000
*
10
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag1"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance3
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
,
1000
*
100
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag1"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_read_performance4
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
index
->
PutOneTarge
(
"tag10"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag12"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag10"
,
"Hello"
,
1000
*
100
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag10"
,
"Hello"
));
}
source/libs/transport/src/trans.c
浏览文件 @
eb2eb6a5
...
@@ -70,6 +70,7 @@ int32_t rpcInit(void) {
...
@@ -70,6 +70,7 @@ int32_t rpcInit(void) {
void
rpcCleanup
(
void
)
{
void
rpcCleanup
(
void
)
{
// impl later
// impl later
//
return
;
return
;
}
}
#endif
#endif
source/libs/transport/src/transCli.c
浏览文件 @
eb2eb6a5
...
@@ -26,6 +26,7 @@ typedef struct SCliConn {
...
@@ -26,6 +26,7 @@ typedef struct SCliConn {
queue
conn
;
queue
conn
;
char
spi
;
char
spi
;
char
secured
;
char
secured
;
uint64_t
expireTime
;
}
SCliConn
;
}
SCliConn
;
typedef
struct
SCliMsg
{
typedef
struct
SCliMsg
{
...
@@ -39,10 +40,13 @@ typedef struct SCliThrdObj {
...
@@ -39,10 +40,13 @@ typedef struct SCliThrdObj {
pthread_t
thread
;
pthread_t
thread
;
uv_loop_t
*
loop
;
uv_loop_t
*
loop
;
uv_async_t
*
cliAsync
;
//
uv_async_t
*
cliAsync
;
//
void
*
cache
;
// conn pool
uv_timer_t
*
pTimer
;
void
*
cache
;
// conn pool
queue
msg
;
queue
msg
;
pthread_mutex_t
msgMtx
;
pthread_mutex_t
msgMtx
;
void
*
shandle
;
uint64_t
nextTimeout
;
// next timeout
void
*
shandle
;
//
}
SCliThrdObj
;
}
SCliThrdObj
;
typedef
struct
SClientObj
{
typedef
struct
SClientObj
{
...
@@ -52,10 +56,19 @@ typedef struct SClientObj {
...
@@ -52,10 +56,19 @@ typedef struct SClientObj {
SCliThrdObj
**
pThreadObj
;
SCliThrdObj
**
pThreadObj
;
}
SClientObj
;
}
SClientObj
;
typedef
struct
SConnList
{
queue
conn
;
}
SConnList
;
// conn pool
// conn pool
// add expire timeout and capacity limit
static
void
*
connCacheCreate
(
int
size
);
static
void
*
connCacheDestroy
(
void
*
cache
);
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
);
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
);
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
);
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
);
// register timer in each thread to clear expire conn
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
);
// process data read from server, auth/decompress etc
// process data read from server, auth/decompress etc
static
void
clientProcessData
(
SCliConn
*
conn
);
static
void
clientProcessData
(
SCliConn
*
conn
);
// check whether already read complete packet from server
// check whether already read complete packet from server
...
@@ -77,10 +90,93 @@ static void clientMsgDestroy(SCliMsg* pMsg);
...
@@ -77,10 +90,93 @@ static void clientMsgDestroy(SCliMsg* pMsg);
static
void
*
clientThread
(
void
*
arg
);
static
void
*
clientThread
(
void
*
arg
);
static
void
clientProcessData
(
SCliConn
*
conn
)
{
static
void
clientProcessData
(
SCliConn
*
conn
)
{
STransConnCtx
*
pCtx
=
((
SCliMsg
*
)
conn
->
data
)
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
ahandle
;
SRpcMsg
rpcMsg
;
rpcMsg
.
pCont
=
conn
->
readBuf
.
buf
;
rpcMsg
.
contLen
=
conn
->
readBuf
.
len
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
// impl
// impl
}
}
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
SRpcInfo
*
pRpc
=
pThrd
->
shandle
;
int64_t
currentTime
=
pThrd
->
nextTimeout
;
SConnList
*
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
cache
,
NULL
);
while
(
p
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
p
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
p
->
conn
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
if
(
c
->
expireTime
<
currentTime
)
{
QUEUE_REMOVE
(
h
);
clientConnDestroy
(
c
);
}
else
{
break
;
}
}
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
cache
,
p
);
}
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
uv_timer_start
(
handle
,
clientTimeoutCb
,
pRpc
->
idleTime
*
10
,
0
);
}
static
void
*
connCacheCreate
(
int
size
)
{
SHashObj
*
cache
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
return
false
;
}
static
void
*
connCacheDestroy
(
void
*
cache
)
{
SConnList
*
connList
=
taosHashIterate
((
SHashObj
*
)
cache
,
NULL
);
while
(
connList
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
connList
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
connList
->
conn
);
QUEUE_REMOVE
(
h
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
clientConnDestroy
(
c
);
}
connList
=
taosHashIterate
((
SHashObj
*
)
cache
,
connList
);
}
taosHashClear
(
cache
);
}
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
)
{
char
key
[
128
]
=
{
0
};
tstrncpy
(
key
,
ip
,
strlen
(
ip
));
tstrncpy
(
key
+
strlen
(
key
),
(
char
*
)(
&
port
),
sizeof
(
port
));
SHashObj
*
pCache
=
cache
;
SConnList
*
plist
=
taosHashGet
(
pCache
,
key
,
strlen
(
key
));
if
(
plist
==
NULL
)
{
SConnList
list
;
plist
=
&
list
;
QUEUE_INIT
(
&
plist
->
conn
);
taosHashPut
(
pCache
,
key
,
strlen
(
key
),
plist
,
sizeof
(
*
plist
));
}
if
(
QUEUE_IS_EMPTY
(
&
plist
->
conn
))
{
return
NULL
;
}
queue
*
h
=
QUEUE_HEAD
(
&
plist
->
conn
);
QUEUE_REMOVE
(
h
);
return
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
}
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
)
{
char
key
[
128
]
=
{
0
};
tstrncpy
(
key
,
ip
,
strlen
(
ip
));
tstrncpy
(
key
+
strlen
(
key
),
(
char
*
)(
&
port
),
sizeof
(
port
));
STransConnCtx
*
ctx
=
((
SCliMsg
*
)
conn
->
data
)
->
ctx
;
SRpcInfo
*
pRpc
=
ctx
->
pRpc
;
conn
->
expireTime
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
cache
,
key
,
strlen
(
key
));
// list already create before
assert
(
plist
!=
NULL
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
conn
);
}
static
bool
clientReadComplete
(
SConnBuffer
*
data
)
{
static
bool
clientReadComplete
(
SConnBuffer
*
data
)
{
STransMsgHead
head
;
STransMsgHead
head
;
int32_t
headLen
=
sizeof
(
head
);
int32_t
headLen
=
sizeof
(
head
);
...
@@ -152,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) {
...
@@ -152,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) {
}
}
static
void
clientDestroy
(
uv_handle_t
*
handle
)
{
static
void
clientDestroy
(
uv_handle_t
*
handle
)
{
SCliConn
*
conn
=
handle
->
data
;
SCliConn
*
conn
=
handle
->
data
;
QUEUE_REMOVE
(
&
conn
->
conn
);
clientConnDestroy
(
conn
);
clientConnDestroy
(
conn
);
}
}
...
@@ -206,15 +303,6 @@ static void clientConnCb(uv_connect_t* req, int status) {
...
@@ -206,15 +303,6 @@ static void clientConnCb(uv_connect_t* req, int status) {
clientWrite
(
pConn
);
clientWrite
(
pConn
);
}
}
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
)
{
// impl later
return
NULL
;
}
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
)
{
// impl later
}
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
uint64_t
et
=
taosGetTimestampUs
();
uint64_t
et
=
taosGetTimestampUs
();
uint64_t
el
=
et
-
pMsg
->
st
;
uint64_t
el
=
et
-
pMsg
->
st
;
...
@@ -234,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
...
@@ -234,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
conn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
conn
->
writeReq
=
malloc
(
sizeof
(
uv_write_t
));
conn
->
writeReq
=
malloc
(
sizeof
(
uv_write_t
));
QUEUE_INIT
(
&
conn
->
conn
);
conn
->
connReq
.
data
=
conn
;
conn
->
connReq
.
data
=
conn
;
conn
->
data
=
pMsg
;
conn
->
data
=
pMsg
;
...
@@ -270,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) {
...
@@ -270,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) {
static
void
*
clientThread
(
void
*
arg
)
{
static
void
*
clientThread
(
void
*
arg
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
SRpcInfo
*
pRpc
=
pThrd
->
shandle
;
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
uv_timer_start
(
pThrd
->
pTimer
,
clientTimeoutCb
,
pRpc
->
idleTime
*
10
,
0
);
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
}
...
@@ -291,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
...
@@ -291,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
pTimer
=
malloc
(
sizeof
(
uv_timer_t
));
uv_timer_init
(
pThrd
->
loop
,
pThrd
->
pTimer
);
pThrd
->
shandle
=
shandle
;
pThrd
->
shandle
=
shandle
;
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
if
(
err
==
0
)
{
if
(
err
==
0
)
{
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
eb2eb6a5
...
@@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
...
@@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
// refers specifically to query or insert timeout
// refers specifically to query or insert timeout
static
void
uvHandleActivityTimeout
(
uv_timer_t
*
handle
)
{
static
void
uvHandleActivityTimeout
(
uv_timer_t
*
handle
)
{
// impl later
SConn
*
conn
=
handle
->
data
;
SConn
*
conn
=
handle
->
data
;
tDebug
(
"%p timeout since no activity"
,
conn
);
}
}
static
void
uvProcessData
(
SConn
*
pConn
)
{
static
void
uvProcessData
(
SConn
*
pConn
)
{
...
@@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) {
...
@@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) {
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
p
->
shandle
;
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
p
->
shandle
;
// auth here
// auth here
// auth should not do in rpc thread
int8_t
code
=
uvAuthMsg
(
pConn
,
(
char
*
)
pHead
,
p
->
msgLen
);
//
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
if
(
code
!=
0
)
{
//
if (code != 0) {
terrno
=
code
;
//
terrno = code;
return
;
//
return;
}
//
}
pHead
->
code
=
htonl
(
pHead
->
code
);
pHead
->
code
=
htonl
(
pHead
->
code
);
int32_t
dlen
=
0
;
int32_t
dlen
=
0
;
...
@@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) {
...
@@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) {
}
else
{
}
else
{
// impl later
// impl later
}
}
rpcMsg
.
contLen
=
rpc
ContLenFromMsg
(
pHead
->
msgLen
);
rpcMsg
.
contLen
=
trans
ContLenFromMsg
(
pHead
->
msgLen
);
rpcMsg
.
pCont
=
pHead
->
content
;
rpcMsg
.
pCont
=
pHead
->
content
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
code
=
pHead
->
code
;
rpcMsg
.
code
=
pHead
->
code
;
...
@@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
...
@@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
return
;
return
;
}
}
uv_buf_t
wb
=
uv_buf_init
(
conn
->
writeBuf
.
buf
,
conn
->
writeBuf
.
len
);
uv_buf_t
wb
=
uv_buf_init
(
conn
->
writeBuf
.
buf
,
conn
->
writeBuf
.
len
);
uv_timer_stop
(
conn
->
pTimer
);
uv_write
(
conn
->
pWriter
,
(
uv_stream_t
*
)
conn
->
pTcp
,
&
wb
,
1
,
uvOnWriteCb
);
uv_write
(
conn
->
pWriter
,
(
uv_stream_t
*
)
conn
->
pTcp
,
&
wb
,
1
,
uvOnWriteCb
);
}
}
}
}
...
...
source/libs/transport/test/rclient.c
浏览文件 @
eb2eb6a5
...
@@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
...
@@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
rpcFreeCont
(
pMsg
->
pCont
);
rpcFreeCont
(
pMsg
->
pCont
);
// tsem_post(&pInfo->rspSem);
tsem_post
(
&
pInfo
->
rspSem
);
tsem_post
(
&
pInfo
->
rspSem
);
}
}
...
@@ -60,6 +61,7 @@ static void *sendRequest(void *param) {
...
@@ -60,6 +61,7 @@ static void *sendRequest(void *param) {
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest
(
pInfo
->
pRpc
,
&
pInfo
->
epSet
,
&
rpcMsg
,
NULL
);
rpcSendRequest
(
pInfo
->
pRpc
,
&
pInfo
->
epSet
,
&
rpcMsg
,
NULL
);
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
// tsem_wait(&pInfo->rspSem);
tsem_wait
(
&
pInfo
->
rspSem
);
tsem_wait
(
&
pInfo
->
rspSem
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录