Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0be19201
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0be19201
编写于
12月 27, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/qnode
上级
8febfa11
c5b42e57
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
249 addition
and
13 deletion
+249
-13
source/libs/index/CMakeLists.txt
source/libs/index/CMakeLists.txt
+2
-0
source/libs/index/inc/index_fst_automation.h
source/libs/index/inc/index_fst_automation.h
+1
-1
source/libs/index/src/index.c
source/libs/index/src/index.c
+0
-1
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+8
-6
source/libs/index/src/index_fst_automation.c
source/libs/index/src/index_fst_automation.c
+25
-1
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+22
-4
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+174
-0
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+17
-0
未找到文件。
source/libs/index/CMakeLists.txt
浏览文件 @
0be19201
...
...
@@ -3,7 +3,9 @@ add_library(index ${INDEX_SRC})
target_include_directories
(
index
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_link_libraries
(
index
...
...
source/libs/index/inc/index_fst_automation.h
浏览文件 @
0be19201
...
...
@@ -23,7 +23,7 @@ extern "C" {
typedef
struct
AutomationCtx
AutomationCtx
;
typedef
enum
AutomationType
{
AUTOMATION_PREFIX
,
AUTMMATION_MATCH
}
AutomationType
;
typedef
enum
AutomationType
{
AUTOMATION_
ALWAYS
,
AUTOMATION_
PREFIX
,
AUTMMATION_MATCH
}
AutomationType
;
typedef
struct
StartWith
{
AutomationCtx
*
autoSelf
;
...
...
source/libs/index/src/index.c
浏览文件 @
0be19201
...
...
@@ -355,7 +355,6 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
}
static
int
indexFlushCacheTFile
(
SIndex
*
sIdx
)
{
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
indexWarn
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
return
0
;
...
...
source/libs/index/src/index_fst.c
浏览文件 @
0be19201
...
...
@@ -1083,7 +1083,7 @@ bool fstBoundWithDataExceededBy(FstBoundWithData* bound, FstSlice* slice) {
}
else
if
(
bound
->
type
==
Excluded
)
{
return
comp
>=
0
?
true
:
false
;
}
else
{
return
tru
e
;
return
fals
e
;
}
}
bool
fstBoundWithDataIsEmpty
(
FstBoundWithData
*
bound
)
{
...
...
@@ -1224,7 +1224,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
void
*
start
=
automFuncs
[
aut
->
type
].
start
(
aut
);
if
(
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
start
))
{
FstSlice
s
=
fstSliceCreate
(
NULL
,
0
);
return
swsResultCreate
(
&
s
,
output
,
callback
(
start
));
return
swsResultCreate
(
&
s
,
output
,
callback
==
NULL
?
NULL
:
callback
(
start
));
}
}
SArray
*
nodes
=
taosArrayInit
(
8
,
sizeof
(
FstNode
*
));
...
...
@@ -1237,10 +1237,12 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
}
FstTransition
trn
;
fstNodeGetTransitionAt
(
p
->
node
,
p
->
trans
,
&
trn
);
Output
out
=
p
->
out
.
out
+
trn
.
out
;
void
*
nextState
=
automFuncs
[
aut
->
type
].
accept
(
aut
,
p
->
autState
,
trn
.
inp
);
void
*
tState
=
callback
(
nextState
);
bool
isMatch
=
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
nextState
);
Output
out
=
p
->
out
.
out
+
trn
.
out
;
void
*
nextState
=
automFuncs
[
aut
->
type
].
accept
(
aut
,
p
->
autState
,
trn
.
inp
);
void
*
tState
=
(
callback
==
NULL
)
?
NULL
:
callback
(
nextState
);
bool
isMatch
=
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
nextState
);
FstNode
*
nextNode
=
fstGetNode
(
sws
->
fst
,
trn
.
addr
);
taosArrayPush
(
nodes
,
&
nextNode
);
taosArrayPush
(
sws
->
inp
,
&
(
trn
.
inp
));
...
...
source/libs/index/src/index_fst_automation.c
浏览文件 @
0be19201
...
...
@@ -64,6 +64,25 @@ StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
return
nsv
;
}
// iterate fst
static
void
*
alwaysMatchStart
(
AutomationCtx
*
ctx
)
{
return
NULL
;
}
static
bool
alwaysMatchIsMatch
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
true
;
}
static
bool
alwaysMatchCanMatch
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
true
;
}
static
bool
alwaysMatchWillAlwaysMatch
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
true
;
}
static
void
*
alwaysMatchAccpet
(
AutomationCtx
*
ctx
,
void
*
state
,
uint8_t
byte
)
{
return
NULL
;
}
static
void
*
alwaysMatchAccpetEof
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
NULL
;
}
// prefix query, impl later
static
void
*
prefixStart
(
AutomationCtx
*
ctx
)
{
...
...
@@ -127,6 +146,7 @@ static void* patternAcceptEof(AutomationCtx* ctx, void* state) {
}
AutomationFunc
automFuncs
[]
=
{
{
alwaysMatchStart
,
alwaysMatchIsMatch
,
alwaysMatchCanMatch
,
alwaysMatchWillAlwaysMatch
,
alwaysMatchAccpet
,
alwaysMatchAccpetEof
},
{
prefixStart
,
prefixIsMatch
,
prefixCanMatch
,
prefixWillAlwaysMatch
,
prefixAccept
,
prefixAcceptEof
},
{
patternStart
,
patternIsMatch
,
patternCanMatch
,
patternWillAlwaysMatch
,
patternAccept
,
patternAcceptEof
}
// add more search type
...
...
@@ -137,7 +157,11 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
if
(
ctx
==
NULL
)
{
return
NULL
;
}
StartWithStateValue
*
sv
=
NULL
;
if
(
atype
==
AUTOMATION_PREFIX
)
{
if
(
atype
==
AUTOMATION_ALWAYS
)
{
int
val
=
0
;
sv
=
startWithStateValueCreate
(
Running
,
FST_INT
,
&
val
);
ctx
->
stdata
=
(
void
*
)
sv
;
}
else
if
(
atype
==
AUTOMATION_PREFIX
)
{
int
val
=
0
;
sv
=
startWithStateValueCreate
(
Running
,
FST_INT
,
&
val
);
ctx
->
stdata
=
(
void
*
)
sv
;
...
...
source/libs/index/test/CMakeLists.txt
浏览文件 @
0be19201
add_executable
(
indexTest
""
)
add_executable
(
fstTest
""
)
target_sources
(
indexTest
PRIVATE
"indexTests.cc"
)
target_sources
(
fstTest
PRIVATE
"fstTest.cc"
)
target_include_directories
(
indexTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
fstTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
indexTest
os
util
...
...
@@ -15,8 +25,16 @@ target_link_libraries (indexTest
gtest_main
index
)
add_test
(
NAME index_test
COMMAND indexTest
target_link_libraries
(
fstTest
os
util
common
gtest_main
index
)
#add_test(
# NAME index_test
# COMMAND indexTest
#)
source/libs/index/test/fstTest.cc
0 → 100644
浏览文件 @
0be19201
#include <iostream>
#include <string>
#include <vector>
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#include "index_tfile.h"
#include "tskiplist.h"
#include "tutil.h"
void
*
callback
(
void
*
s
)
{
return
s
;
}
static
std
::
string
fileName
=
"/tmp/tindex.tindex"
;
class
FstWriter
{
public:
FstWriter
()
{
remove
(
fileName
.
c_str
());
_wc
=
writerCtxCreate
(
TFile
,
fileName
.
c_str
(),
false
,
64
*
1024
*
1024
);
_b
=
fstBuilderCreate
(
_wc
,
0
);
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstBuilderInsert
(
_b
,
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
~
FstWriter
()
{
fstBuilderFinish
(
_b
);
fstBuilderDestroy
(
_b
);
writerCtxDestroy
(
_wc
);
}
private:
FstBuilder
*
_b
;
WriterCtx
*
_wc
;
};
class
FstReadMemory
{
public:
FstReadMemory
(
size_t
size
)
{
_wc
=
writerCtxCreate
(
TFile
,
fileName
.
c_str
(),
true
,
64
*
1024
);
_w
=
fstCountingWriterCreate
(
_wc
);
_size
=
size
;
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
}
bool
init
()
{
char
*
buf
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
_size
);
int
nRead
=
fstCountingWriterRead
(
_w
,
(
uint8_t
*
)
buf
,
_size
);
if
(
nRead
<=
0
)
{
return
false
;
}
_size
=
nRead
;
_s
=
fstSliceCreate
((
uint8_t
*
)
buf
,
_size
);
_fst
=
fstCreate
(
&
_s
);
free
(
buf
);
return
_fst
!=
NULL
;
}
bool
Get
(
const
std
::
string
&
key
,
uint64_t
*
val
)
{
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstGet
(
_fst
,
&
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
bool
GetWithTimeCostUs
(
const
std
::
string
&
key
,
uint64_t
*
val
,
uint64_t
*
elapse
)
{
int64_t
s
=
taosGetTimestampUs
();
bool
ok
=
this
->
Get
(
key
,
val
);
int64_t
e
=
taosGetTimestampUs
();
*
elapse
=
e
-
s
;
return
ok
;
}
// add later
bool
Search
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>&
result
)
{
FstStreamBuilder
*
sb
=
fstSearch
(
_fst
,
ctx
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
// result.push_back((uint64_t)(rt->out.out));
FstSlice
*
s
=
&
rt
->
data
;
int32_t
sz
=
0
;
char
*
ch
=
(
char
*
)
fstSliceData
(
s
,
&
sz
);
std
::
string
key
(
ch
,
sz
);
printf
(
"key: %s, val: %"
PRIu64
"
\n
"
,
key
.
c_str
(),
(
uint64_t
)(
rt
->
out
.
out
));
swsResultDestroy
(
rt
);
}
for
(
size_t
i
=
0
;
i
<
result
.
size
();
i
++
)
{}
std
::
cout
<<
std
::
endl
;
return
true
;
}
bool
SearchWithTimeCostUs
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>&
result
)
{
int64_t
s
=
taosGetTimestampUs
();
bool
ok
=
this
->
Search
(
ctx
,
result
);
int64_t
e
=
taosGetTimestampUs
();
return
ok
;
}
~
FstReadMemory
()
{
fstCountingWriterDestroy
(
_w
);
fstDestroy
(
_fst
);
fstSliceDestroy
(
&
_s
);
writerCtxDestroy
(
_wc
);
}
private:
FstCountingWriter
*
_w
;
Fst
*
_fst
;
FstSlice
_s
;
WriterCtx
*
_wc
;
size_t
_size
;
};
#define L 100
#define M 100
#define N 100
int
Performance_fstWriteRecords
(
FstWriter
*
b
)
{
std
::
string
str
(
"aa"
);
for
(
int
i
=
0
;
i
<
L
;
i
++
)
{
str
[
0
]
=
'a'
+
i
;
str
.
resize
(
2
);
for
(
int
j
=
0
;
j
<
M
;
j
++
)
{
str
[
1
]
=
'a'
+
j
;
str
.
resize
(
2
);
for
(
int
k
=
0
;
k
<
N
;
k
++
)
{
str
.
push_back
(
'a'
);
b
->
Put
(
str
,
k
);
printf
(
"(%d, %d, %d, %s)
\n
"
,
i
,
j
,
k
,
str
.
c_str
());
}
}
}
return
L
*
M
*
N
;
}
void
checkFstCheckIterator
()
{
tfInit
();
FstWriter
*
fw
=
new
FstWriter
;
int64_t
s
=
taosGetTimestampUs
();
int
count
=
2
;
Performance_fstWriteRecords
(
fw
);
int64_t
e
=
taosGetTimestampUs
();
std
::
cout
<<
"insert data count : "
<<
count
<<
"elapas time: "
<<
e
-
s
<<
std
::
endl
;
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
()
==
false
)
{
std
::
cout
<<
"init readMemory failed"
<<
std
::
endl
;
delete
m
;
return
;
}
// prefix search
std
::
vector
<
uint64_t
>
result
;
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
"ab"
,
AUTOMATION_ALWAYS
);
m
->
Search
(
ctx
,
result
);
std
::
cout
<<
"size: "
<<
result
.
size
()
<<
std
::
endl
;
// assert(result.size() == count);
for
(
int
i
=
0
;
i
<
result
.
size
();
i
++
)
{
// assert(result[i] == i); // check result
}
free
(
ctx
);
delete
m
;
tfCleanup
();
}
int
main
()
{
checkFstCheckIterator
();
// checkFstPrefixSearch();
return
1
;
}
source/libs/index/test/indexTests.cc
浏览文件 @
0be19201
...
...
@@ -481,6 +481,10 @@ class CacheObj {
}
return
ret
;
}
void
Debug
()
{
//
indexCacheDebug
(
cache
);
}
int
Get
(
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
)
{
int
ret
=
indexCacheSearch
(
cache
,
query
,
colId
,
version
,
result
,
s
);
if
(
ret
!=
0
)
{
...
...
@@ -515,6 +519,7 @@ class IndexCacheEnv : public ::testing::Test {
TEST_F
(
IndexCacheEnv
,
cache_test
)
{
int
version
=
0
;
int16_t
colId
=
0
;
int16_t
othColId
=
10
;
uint64_t
suid
=
0
;
std
::
string
colName
(
"voltage"
);
...
...
@@ -544,6 +549,16 @@ TEST_F(IndexCacheEnv, cache_test) {
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
}
{
std
::
string
colVal
(
"v3"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
othColId
,
version
++
,
suid
++
);
}
{
std
::
string
colVal
(
"v4"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
othColId
,
version
++
,
suid
++
);
}
{
std
::
string
colVal
(
"v4"
);
for
(
size_t
i
=
0
;
i
<
100
;
i
++
)
{
...
...
@@ -553,6 +568,8 @@ TEST_F(IndexCacheEnv, cache_test) {
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
}
}
coj
->
Debug
();
// begin query
{
std
::
string
colVal
(
"v3"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录