Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9233663a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9233663a
编写于
2月 22, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add index test U
上级
0c05a25f
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
602 addition
and
159 deletion
+602
-159
source/libs/index/src/index.c
source/libs/index/src/index.c
+42
-14
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+50
-19
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+134
-64
source/libs/index/src/index_fst_automation.c
source/libs/index/src/index_fst_automation.c
+18
-6
source/libs/index/src/index_fst_util.c
source/libs/index/src/index_fst_util.c
+5
-20
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+93
-33
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+20
-0
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+8
-3
source/libs/index/test/fstUT.cc
source/libs/index/test/fstUT.cc
+232
-0
未找到文件。
source/libs/index/src/index.c
浏览文件 @
9233663a
...
...
@@ -66,7 +66,9 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv);
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
pthread_once
(
&
isInit
,
indexInit
);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
#ifdef USE_LUCENE
index_t
*
index
=
index_open
(
path
);
...
...
@@ -76,7 +78,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#ifdef USE_INVERTED_INDEX
// sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx
->
tindex
=
indexTFileCreate
(
path
);
if
(
sIdx
->
tindex
==
NULL
)
{
goto
END
;
}
if
(
sIdx
->
tindex
==
NULL
)
{
goto
END
;
}
sIdx
->
colObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
cVersion
=
1
;
...
...
@@ -87,7 +91,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#endif
END:
if
(
sIdx
!=
NULL
)
{
indexClose
(
sIdx
);
}
if
(
sIdx
!=
NULL
)
{
indexClose
(
sIdx
);
}
*
index
=
NULL
;
return
-
1
;
...
...
@@ -103,7 +109,9 @@ void indexClose(SIndex* sIdx) {
void
*
iter
=
taosHashIterate
(
sIdx
->
colObj
,
NULL
);
while
(
iter
)
{
IndexCache
**
pCache
=
iter
;
if
(
*
pCache
)
{
indexCacheUnRef
(
*
pCache
);
}
if
(
*
pCache
)
{
indexCacheUnRef
(
*
pCache
);
}
iter
=
taosHashIterate
(
sIdx
->
colObj
,
iter
);
}
taosHashCleanup
(
sIdx
->
colObj
);
...
...
@@ -161,7 +169,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
assert
(
*
cache
!=
NULL
);
int
ret
=
indexCachePut
(
*
cache
,
p
,
uid
);
if
(
ret
!=
0
)
{
return
ret
;
}
if
(
ret
!=
0
)
{
return
ret
;
}
}
#endif
...
...
@@ -191,7 +201,9 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
int
tsz
=
0
;
index_multi_search
(
index
->
index
,
(
const
char
**
)
fields
,
(
const
char
**
)
keys
,
types
,
nQuery
,
opera
,
&
tResult
,
&
tsz
);
for
(
int
i
=
0
;
i
<
tsz
;
i
++
)
{
taosArrayPush
(
result
,
&
tResult
[
i
]);
}
for
(
int
i
=
0
;
i
<
tsz
;
i
++
)
{
taosArrayPush
(
result
,
&
tResult
[
i
]);
}
for
(
int
i
=
0
;
i
<
nQuery
;
i
++
)
{
free
(
fields
[
i
]);
...
...
@@ -248,7 +260,9 @@ void indexOptsDestroy(SIndexOpts* opts) {
*/
SIndexMultiTermQuery
*
indexMultiTermQueryCreate
(
EIndexOperatorType
opera
)
{
SIndexMultiTermQuery
*
p
=
(
SIndexMultiTermQuery
*
)
malloc
(
sizeof
(
SIndexMultiTermQuery
));
if
(
p
==
NULL
)
{
return
NULL
;
}
if
(
p
==
NULL
)
{
return
NULL
;
}
p
->
opera
=
opera
;
p
->
query
=
taosArrayInit
(
4
,
sizeof
(
SIndexTermQuery
));
return
p
;
...
...
@@ -270,7 +284,9 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
SIndexTerm
*
indexTermCreate
(
int64_t
suid
,
SIndexOperOnColumn
oper
,
uint8_t
colType
,
const
char
*
colName
,
int32_t
nColName
,
const
char
*
colVal
,
int32_t
nColVal
)
{
SIndexTerm
*
t
=
(
SIndexTerm
*
)
calloc
(
1
,
(
sizeof
(
SIndexTerm
)));
if
(
t
==
NULL
)
{
return
NULL
;
}
if
(
t
==
NULL
)
{
return
NULL
;
}
t
->
suid
=
suid
;
t
->
operType
=
oper
;
...
...
@@ -343,7 +359,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
return
0
;
}
static
void
indexInterResultsDestroy
(
SArray
*
results
)
{
if
(
results
==
NULL
)
{
return
;
}
if
(
results
==
NULL
)
{
return
;
}
size_t
sz
=
taosArrayGetSize
(
results
);
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -419,18 +437,24 @@ static void indexDestroyTempResult(SArray* result) {
taosArrayDestroy
(
result
);
}
int
indexFlushCacheToTFile
(
SIndex
*
sIdx
,
void
*
cache
)
{
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
indexInfo
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
int64_t
st
=
taosGetTimestampUs
();
IndexCache
*
pCache
=
(
IndexCache
*
)
cache
;
TFileReader
*
pReader
=
tfileGetReaderByCol
(
sIdx
->
tindex
,
pCache
->
suid
,
pCache
->
colName
);
if
(
pReader
==
NULL
)
{
indexWarn
(
"empty tfile reader found"
);
}
if
(
pReader
==
NULL
)
{
indexWarn
(
"empty tfile reader found"
);
}
// handle flush
Iterate
*
cacheIter
=
indexCacheIteratorCreate
(
pCache
);
Iterate
*
tfileIter
=
tfileIteratorCreate
(
pReader
);
if
(
tfileIter
==
NULL
)
{
indexWarn
(
"empty tfile reader iterator"
);
}
if
(
tfileIter
==
NULL
)
{
indexWarn
(
"empty tfile reader iterator"
);
}
SArray
*
result
=
taosArrayInit
(
1024
,
sizeof
(
void
*
));
...
...
@@ -484,7 +508,9 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
taosArrayDestroy
(
value
->
val
);
value
->
val
=
NULL
;
}
else
{
if
(
value
->
val
!=
NULL
)
{
taosArrayClear
(
value
->
val
);
}
if
(
value
->
val
!=
NULL
)
{
taosArrayClear
(
value
->
val
);
}
}
free
(
value
->
colVal
);
value
->
colVal
=
NULL
;
...
...
@@ -507,7 +533,9 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
tfileWriterClose
(
tw
);
TFileReader
*
reader
=
tfileReaderOpen
(
sIdx
->
path
,
cache
->
suid
,
version
,
cache
->
colName
);
if
(
reader
==
NULL
)
{
return
-
1
;
}
if
(
reader
==
NULL
)
{
return
-
1
;
}
TFileHeader
*
header
=
&
reader
->
header
;
ICacheKey
key
=
{.
suid
=
cache
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
)};
...
...
source/libs/index/src/index_cache.c
浏览文件 @
9233663a
...
...
@@ -119,13 +119,17 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
tSkipListDestroy
(
slt
);
}
void
indexCacheDestroyImm
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
if
(
cache
==
NULL
)
{
return
;
}
MemTable
*
tbl
=
NULL
;
pthread_mutex_lock
(
&
cache
->
mtx
);
tbl
=
cache
->
imm
;
cache
->
imm
=
NULL
;
// or throw int bg thread
pthread_cond_broadcast
(
&
cache
->
finished
);
pthread_mutex_unlock
(
&
cache
->
mtx
);
indexMemUnRef
(
tbl
);
...
...
@@ -133,7 +137,9 @@ void indexCacheDestroyImm(IndexCache* cache) {
}
void
indexCacheDestroy
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
if
(
pCache
==
NULL
)
{
return
;
}
if
(
pCache
==
NULL
)
{
return
;
}
indexMemUnRef
(
pCache
->
mem
);
indexMemUnRef
(
pCache
->
imm
);
free
(
pCache
->
colName
);
...
...
@@ -146,7 +152,9 @@ void indexCacheDestroy(void* cache) {
Iterate
*
indexCacheIteratorCreate
(
IndexCache
*
cache
)
{
Iterate
*
iiter
=
calloc
(
1
,
sizeof
(
Iterate
));
if
(
iiter
==
NULL
)
{
return
NULL
;
}
if
(
iiter
==
NULL
)
{
return
NULL
;
}
pthread_mutex_lock
(
&
cache
->
mtx
);
...
...
@@ -164,7 +172,9 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
return
iiter
;
}
void
indexCacheIteratorDestroy
(
Iterate
*
iter
)
{
if
(
iter
==
NULL
)
{
return
;
}
if
(
iter
==
NULL
)
{
return
;
}
tSkipListDestroyIter
(
iter
->
iter
);
iterateValueDestroy
(
&
iter
->
val
,
true
);
free
(
iter
);
...
...
@@ -186,9 +196,6 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
}
else
if
(
cache
->
imm
!=
NULL
)
{
// TODO: wake up by condition variable
pthread_cond_wait
(
&
cache
->
finished
,
&
cache
->
mtx
);
// pthread_mutex_unlock(&cache->mtx);
// taosMsleep(50);
// pthread_mutex_lock(&cache->mtx);
}
else
{
indexCacheRef
(
cache
);
cache
->
imm
=
cache
->
mem
;
...
...
@@ -202,13 +209,17 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
if
(
cache
==
NULL
)
{
return
-
1
;
}
IndexCache
*
pCache
=
cache
;
indexCacheRef
(
pCache
);
// encode data
CacheTerm
*
ct
=
calloc
(
1
,
sizeof
(
CacheTerm
));
if
(
cache
==
NULL
)
{
return
-
1
;
}
if
(
cache
==
NULL
)
{
return
-
1
;
}
// set up key
ct
->
colType
=
term
->
colType
;
ct
->
colVal
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
(
term
->
nColVal
+
1
));
...
...
@@ -240,7 +251,9 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
}
static
int
indexQueryMem
(
MemTable
*
mem
,
CacheTerm
*
ct
,
EIndexQueryType
qtype
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
mem
==
NULL
)
{
return
0
;
}
if
(
mem
==
NULL
)
{
return
0
;
}
char
*
key
=
indexCacheTermGet
(
ct
);
SSkipListIterator
*
iter
=
tSkipListCreateIterFromVal
(
mem
->
mem
,
key
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
...
...
@@ -266,7 +279,9 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
return
0
;
}
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
cache
==
NULL
)
{
return
0
;
}
if
(
cache
==
NULL
)
{
return
0
;
}
IndexCache
*
pCache
=
cache
;
MemTable
*
mem
=
NULL
,
*
imm
=
NULL
;
...
...
@@ -294,23 +309,33 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
}
void
indexCacheRef
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
if
(
cache
==
NULL
)
{
return
;
}
int
ref
=
T_REF_INC
(
cache
);
UNUSED
(
ref
);
}
void
indexCacheUnRef
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
if
(
cache
==
NULL
)
{
return
;
}
int
ref
=
T_REF_DEC
(
cache
);
if
(
ref
==
0
)
{
indexCacheDestroy
(
cache
);
}
if
(
ref
==
0
)
{
indexCacheDestroy
(
cache
);
}
}
void
indexMemRef
(
MemTable
*
tbl
)
{
if
(
tbl
==
NULL
)
{
return
;
}
if
(
tbl
==
NULL
)
{
return
;
}
int
ref
=
T_REF_INC
(
tbl
);
UNUSED
(
ref
);
}
void
indexMemUnRef
(
MemTable
*
tbl
)
{
if
(
tbl
==
NULL
)
{
return
;
}
if
(
tbl
==
NULL
)
{
return
;
}
int
ref
=
T_REF_DEC
(
tbl
);
if
(
ref
==
0
)
{
SSkipList
*
slt
=
tbl
->
mem
;
...
...
@@ -320,7 +345,9 @@ void indexMemUnRef(MemTable* tbl) {
}
static
void
indexCacheTermDestroy
(
CacheTerm
*
ct
)
{
if
(
ct
==
NULL
)
{
return
;
}
if
(
ct
==
NULL
)
{
return
;
}
free
(
ct
->
colVal
);
free
(
ct
);
}
...
...
@@ -333,7 +360,9 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
CacheTerm
*
rt
=
(
CacheTerm
*
)
r
;
// compare colVal
int32_t
cmp
=
strcmp
(
lt
->
colVal
,
rt
->
colVal
);
if
(
cmp
==
0
)
{
return
rt
->
version
-
lt
->
version
;
}
if
(
cmp
==
0
)
{
return
rt
->
version
-
lt
->
version
;
}
return
cmp
;
}
...
...
@@ -354,7 +383,9 @@ static void doMergeWork(SSchedMsg* msg) {
}
static
bool
indexCacheIteratorNext
(
Iterate
*
itera
)
{
SSkipListIterator
*
iter
=
itera
->
iter
;
if
(
iter
==
NULL
)
{
return
false
;
}
if
(
iter
==
NULL
)
{
return
false
;
}
IterateValue
*
iv
=
&
itera
->
val
;
iterateValueDestroy
(
iv
,
false
);
...
...
source/libs/index/src/index_fst.c
浏览文件 @
9233663a
...
...
@@ -31,20 +31,24 @@ static uint8_t fstPackDetla(FstCountingWriter* wrt, CompiledAddr nodeAddr, Compi
FstUnFinishedNodes
*
fstUnFinishedNodesCreate
()
{
FstUnFinishedNodes
*
nodes
=
malloc
(
sizeof
(
FstUnFinishedNodes
));
if
(
nodes
==
NULL
)
{
return
NULL
;
}
if
(
nodes
==
NULL
)
{
return
NULL
;
}
nodes
->
stack
=
(
SArray
*
)
taosArrayInit
(
64
,
sizeof
(
FstBuilderNodeUnfinished
));
fstUnFinishedNodesPushEmpty
(
nodes
,
false
);
return
nodes
;
}
void
unFinishedNodeDestroyElem
(
void
*
elem
)
{
static
void
unFinishedNodeDestroyElem
(
void
*
elem
)
{
FstBuilderNodeUnfinished
*
b
=
(
FstBuilderNodeUnfinished
*
)
elem
;
fstBuilderNodeDestroy
(
b
->
node
);
free
(
b
->
last
);
b
->
last
=
NULL
;
}
void
fstUnFinishedNodesDestroy
(
FstUnFinishedNodes
*
nodes
)
{
if
(
nodes
==
NULL
)
{
return
;
}
if
(
nodes
==
NULL
)
{
return
;
}
taosArrayDestroyEx
(
nodes
->
stack
,
unFinishedNodeDestroyElem
);
free
(
nodes
);
...
...
@@ -92,7 +96,9 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes* nodes, CompiledAddr add
}
void
fstUnFinishedNodesAddSuffix
(
FstUnFinishedNodes
*
nodes
,
FstSlice
bs
,
Output
out
)
{
FstSlice
*
s
=
&
bs
;
if
(
fstSliceIsEmpty
(
s
))
{
return
;
}
if
(
fstSliceIsEmpty
(
s
))
{
return
;
}
size_t
sz
=
taosArrayGetSize
(
nodes
->
stack
)
-
1
;
FstBuilderNodeUnfinished
*
un
=
taosArrayGet
(
nodes
->
stack
,
sz
);
assert
(
un
->
last
==
NULL
);
...
...
@@ -172,7 +178,9 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node,
FstState
fstStateCreateFrom
(
FstSlice
*
slice
,
CompiledAddr
addr
)
{
FstState
fs
=
{.
state
=
EmptyFinal
,
.
val
=
0
};
if
(
addr
==
EMPTY_ADDRESS
)
{
return
fs
;
}
if
(
addr
==
EMPTY_ADDRESS
)
{
return
fs
;
}
uint8_t
*
data
=
fstSliceData
(
slice
,
NULL
);
uint8_t
v
=
data
[
addr
];
...
...
@@ -229,7 +237,9 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
fstStateSetCommInput
(
&
st
,
trn
->
inp
);
bool
null
=
false
;
uint8_t
inp
=
fstStateCommInput
(
&
st
,
&
null
);
if
(
null
==
true
)
{
fstCountingWriterWrite
(
w
,
(
char
*
)
&
trn
->
inp
,
sizeof
(
trn
->
inp
));
}
if
(
null
==
true
)
{
fstCountingWriterWrite
(
w
,
(
char
*
)
&
trn
->
inp
,
sizeof
(
trn
->
inp
));
}
fstCountingWriterWrite
(
w
,
(
char
*
)(
&
(
st
.
val
)),
sizeof
(
st
.
val
));
return
;
}
...
...
@@ -263,7 +273,9 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
fstStateSetStateNtrans
(
&
st
,
(
uint8_t
)
sz
);
if
(
anyOuts
)
{
if
(
FST_BUILDER_NODE_IS_FINAL
(
node
))
{
fstCountingWriterPackUintIn
(
w
,
node
->
finalOutput
,
oSize
);
}
if
(
FST_BUILDER_NODE_IS_FINAL
(
node
))
{
fstCountingWriterPackUintIn
(
w
,
node
->
finalOutput
,
oSize
);
}
for
(
int32_t
i
=
sz
-
1
;
i
>=
0
;
i
--
)
{
FstTransition
*
t
=
taosArrayGet
(
node
->
trans
,
i
);
fstCountingWriterPackUintIn
(
w
,
t
->
out
,
oSize
);
...
...
@@ -428,7 +440,9 @@ Output fstStateOutput(FstState* s, FstNode* node) {
assert
(
s
->
state
==
OneTrans
);
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
node
->
sizes
);
if
(
oSizes
==
0
)
{
return
0
;
}
if
(
oSizes
==
0
)
{
return
0
;
}
FstSlice
*
slice
=
&
node
->
data
;
uint8_t
tSizes
=
FST_GET_TRANSITION_PACK_SIZE
(
node
->
sizes
);
...
...
@@ -440,7 +454,9 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
assert
(
s
->
state
==
AnyTrans
);
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
node
->
sizes
);
if
(
oSizes
==
0
)
{
return
0
;
}
if
(
oSizes
==
0
)
{
return
0
;
}
FstSlice
*
slice
=
&
node
->
data
;
uint8_t
*
data
=
fstSliceData
(
slice
,
NULL
);
uint64_t
at
=
node
->
start
-
fstStateNtransLen
(
s
)
-
1
// pack size
...
...
@@ -453,7 +469,9 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
void
fstStateSetFinalState
(
FstState
*
s
,
bool
yes
)
{
assert
(
s
->
state
==
AnyTrans
);
if
(
yes
)
{
s
->
val
|=
0
b01000000
;
}
if
(
yes
)
{
s
->
val
|=
0
b01000000
;
}
return
;
}
bool
fstStateIsFinalState
(
FstState
*
s
)
{
...
...
@@ -463,7 +481,9 @@ bool fstStateIsFinalState(FstState* s) {
void
fstStateSetStateNtrans
(
FstState
*
s
,
uint8_t
n
)
{
assert
(
s
->
state
==
AnyTrans
);
if
(
n
<=
0
b00111111
)
{
s
->
val
=
(
s
->
val
&
0
b11000000
)
|
n
;
}
if
(
n
<=
0
b00111111
)
{
s
->
val
=
(
s
->
val
&
0
b11000000
)
|
n
;
}
return
;
}
// state_ntrans
...
...
@@ -495,7 +515,9 @@ uint64_t fstStateNtransLen(FstState* s) {
uint64_t
fstStateNtrans
(
FstState
*
s
,
FstSlice
*
slice
)
{
bool
null
=
false
;
uint8_t
n
=
fstStateStateNtrans
(
s
,
&
null
);
if
(
null
!=
true
)
{
return
n
;
}
if
(
null
!=
true
)
{
return
n
;
}
int32_t
len
;
uint8_t
*
data
=
fstSliceData
(
slice
,
&
len
);
n
=
data
[
len
-
2
];
...
...
@@ -505,7 +527,9 @@ uint64_t fstStateNtrans(FstState* s, FstSlice* slice) {
}
Output
fstStateFinalOutput
(
FstState
*
s
,
uint64_t
version
,
FstSlice
*
slice
,
PackSizes
sizes
,
uint64_t
nTrans
)
{
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
sizes
);
if
(
oSizes
==
0
||
!
fstStateIsFinalState
(
s
))
{
return
0
;
}
if
(
oSizes
==
0
||
!
fstStateIsFinalState
(
s
))
{
return
0
;
}
uint64_t
at
=
FST_SLICE_LEN
(
slice
)
-
1
-
fstStateNtransLen
(
s
)
-
1
// pack size
-
fstStateTotalTransSize
(
s
,
version
,
sizes
,
nTrans
)
-
(
nTrans
*
oSizes
)
-
oSizes
;
...
...
@@ -522,7 +546,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) {
uint8_t
*
data
=
fstSliceData
(
slice
,
&
dlen
);
uint64_t
i
=
data
[
at
+
b
];
// uint64_t i = slice->data[slice->start + at + b];
if
(
i
>=
node
->
nTrans
)
{
*
null
=
true
;
}
if
(
i
>=
node
->
nTrans
)
{
*
null
=
true
;
}
return
i
;
}
else
{
uint64_t
start
=
node
->
start
-
fstStateNtransLen
(
s
)
-
1
// pack size
...
...
@@ -539,7 +565,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) {
return
node
->
nTrans
-
i
-
1
;
// bug
}
}
if
(
i
==
len
)
{
*
null
=
true
;
}
if
(
i
==
len
)
{
*
null
=
true
;
}
fstSliceDestroy
(
&
t
);
}
}
...
...
@@ -548,7 +576,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) {
FstNode
*
fstNodeCreate
(
int64_t
version
,
CompiledAddr
addr
,
FstSlice
*
slice
)
{
FstNode
*
n
=
(
FstNode
*
)
malloc
(
sizeof
(
FstNode
));
if
(
n
==
NULL
)
{
return
NULL
;
}
if
(
n
==
NULL
)
{
return
NULL
;
}
FstState
st
=
fstStateCreateFrom
(
slice
,
addr
);
...
...
@@ -614,7 +644,9 @@ void fstNodeDestroy(FstNode* node) {
}
FstTransitions
*
fstNodeTransitions
(
FstNode
*
node
)
{
FstTransitions
*
t
=
malloc
(
sizeof
(
FstTransitions
));
if
(
NULL
==
t
)
{
return
NULL
;
}
if
(
NULL
==
t
)
{
return
NULL
;
}
FstRange
range
=
{.
start
=
0
,
.
end
=
FST_NODE_LEN
(
node
)};
t
->
range
=
range
;
t
->
node
=
node
;
...
...
@@ -721,7 +753,9 @@ bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, Compiled
FstBuilder
*
fstBuilderCreate
(
void
*
w
,
FstType
ty
)
{
FstBuilder
*
b
=
malloc
(
sizeof
(
FstBuilder
));
if
(
NULL
==
b
)
{
return
b
;
}
if
(
NULL
==
b
)
{
return
b
;
}
b
->
wrt
=
fstCountingWriterCreate
(
w
);
b
->
unfinished
=
fstUnFinishedNodesCreate
();
...
...
@@ -735,15 +769,17 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
taosEncodeFixedU64
(
&
pBuf64
,
VERSION
);
fstCountingWriterWrite
(
b
->
wrt
,
buf64
,
sizeof
(
buf64
));
memset
(
buf64
,
0
,
sizeof
(
buf64
));
pBuf64
=
buf64
;
memset
(
buf64
,
0
,
sizeof
(
buf64
));
taosEncodeFixedU64
(
&
pBuf64
,
ty
);
fstCountingWriterWrite
(
b
->
wrt
,
buf64
,
sizeof
(
buf64
));
return
b
;
}
void
fstBuilderDestroy
(
FstBuilder
*
b
)
{
if
(
b
==
NULL
)
{
return
;
}
if
(
b
==
NULL
)
{
return
;
}
fstCountingWriterDestroy
(
b
->
wrt
);
fstUnFinishedNodesDestroy
(
b
->
unfinished
);
...
...
@@ -830,6 +866,7 @@ void fstBuilderCompileFrom(FstBuilder* b, uint64_t istate) {
fstUnFinishedNodesTopLastFreeze
(
b
->
unfinished
,
addr
);
return
;
}
CompiledAddr
fstBuilderCompile
(
FstBuilder
*
b
,
FstBuilderNode
*
bn
)
{
if
(
FST_BUILDER_NODE_IS_FINAL
(
bn
)
&&
FST_BUILDER_NODE_TRANS_ISEMPTY
(
bn
)
&&
FST_BUILDER_NODE_FINALOUTPUT_ISZERO
(
bn
))
{
return
EMPTY_ADDRESS
;
...
...
@@ -844,7 +881,9 @@ CompiledAddr fstBuilderCompile(FstBuilder* b, FstBuilderNode* bn) {
fstBuilderNodeCompileTo
(
bn
,
b
->
wrt
,
b
->
lastAddr
,
startAddr
);
b
->
lastAddr
=
(
CompiledAddr
)(
FST_WRITER_COUNT
(
b
->
wrt
)
-
1
);
if
(
entry
->
state
==
NOTFOUND
)
{
FST_REGISTRY_CELL_INSERT
(
entry
->
cell
,
b
->
lastAddr
);
}
if
(
entry
->
state
==
NOTFOUND
)
{
FST_REGISTRY_CELL_INSERT
(
entry
->
cell
,
b
->
lastAddr
);
}
fstRegistryEntryDestroy
(
entry
);
return
b
->
lastAddr
;
...
...
@@ -887,7 +926,9 @@ FstSlice fstNodeAsSlice(FstNode* node) {
FstLastTransition
*
fstLastTransitionCreate
(
uint8_t
inp
,
Output
out
)
{
FstLastTransition
*
trn
=
malloc
(
sizeof
(
FstLastTransition
));
if
(
trn
==
NULL
)
{
return
NULL
;
}
if
(
trn
==
NULL
)
{
return
NULL
;
}
trn
->
inp
=
inp
;
trn
->
out
=
out
;
...
...
@@ -897,7 +938,9 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) {
void
fstLastTransitionDestroy
(
FstLastTransition
*
trn
)
{
free
(
trn
);
}
void
fstBuilderNodeUnfinishedLastCompiled
(
FstBuilderNodeUnfinished
*
unNode
,
CompiledAddr
addr
)
{
FstLastTransition
*
trn
=
unNode
->
last
;
if
(
trn
==
NULL
)
{
return
;
}
if
(
trn
==
NULL
)
{
return
;
}
FstTransition
t
=
{.
inp
=
trn
->
inp
,
.
out
=
trn
->
out
,
.
addr
=
addr
};
taosArrayPush
(
unNode
->
node
->
trans
,
&
t
);
fstLastTransitionDestroy
(
trn
);
...
...
@@ -906,27 +949,35 @@ void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, Comp
}
void
fstBuilderNodeUnfinishedAddOutputPrefix
(
FstBuilderNodeUnfinished
*
unNode
,
Output
out
)
{
if
(
FST_BUILDER_NODE_IS_FINAL
(
unNode
->
node
))
{
unNode
->
node
->
finalOutput
+=
out
;
}
if
(
FST_BUILDER_NODE_IS_FINAL
(
unNode
->
node
))
{
unNode
->
node
->
finalOutput
+=
out
;
}
size_t
sz
=
taosArrayGetSize
(
unNode
->
node
->
trans
);
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
FstTransition
*
trn
=
taosArrayGet
(
unNode
->
node
->
trans
,
i
);
trn
->
out
+=
out
;
}
if
(
unNode
->
last
)
{
unNode
->
last
->
out
+=
out
;
}
if
(
unNode
->
last
)
{
unNode
->
last
->
out
+=
out
;
}
return
;
}
Fst
*
fstCreate
(
FstSlice
*
slice
)
{
int32_t
slen
;
char
*
buf
=
fstSliceData
(
slice
,
&
slen
);
if
(
slen
<
36
)
{
return
NULL
;
}
if
(
slen
<
36
)
{
return
NULL
;
}
uint64_t
len
=
slen
;
uint64_t
skip
=
0
;
uint64_t
version
;
taosDecodeFixedU64
(
buf
,
&
version
);
skip
+=
sizeof
(
version
);
if
(
version
==
0
||
version
>
VERSION
)
{
return
NULL
;
}
if
(
version
==
0
||
version
>
VERSION
)
{
return
NULL
;
}
uint64_t
type
;
taosDecodeFixedU64
(
buf
+
skip
,
&
type
);
...
...
@@ -949,10 +1000,14 @@ Fst* fstCreate(FstSlice* slice) {
taosDecodeFixedU64
(
buf
+
len
,
&
fstLen
);
// TODO(validate root addr)
Fst
*
fst
=
(
Fst
*
)
calloc
(
1
,
sizeof
(
Fst
));
if
(
fst
==
NULL
)
{
return
NULL
;
}
if
(
fst
==
NULL
)
{
return
NULL
;
}
fst
->
meta
=
(
FstMeta
*
)
malloc
(
sizeof
(
FstMeta
));
if
(
NULL
==
fst
->
meta
)
{
goto
FST_CREAT_FAILED
;
}
if
(
NULL
==
fst
->
meta
)
{
goto
FST_CREAT_FAILED
;
}
fst
->
meta
->
version
=
version
;
fst
->
meta
->
rootAddr
=
rootAddr
;
...
...
@@ -983,7 +1038,7 @@ void fstDestroy(Fst* fst) {
bool
fstGet
(
Fst
*
fst
,
FstSlice
*
b
,
Output
*
out
)
{
// dec lock range
pthread_mutex_lock
(
&
fst
->
mtx
);
//
pthread_mutex_lock(&fst->mtx);
FstNode
*
root
=
fstGetRoot
(
fst
);
Output
tOut
=
0
;
int32_t
len
;
...
...
@@ -996,7 +1051,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
uint8_t
inp
=
data
[
i
];
Output
res
=
0
;
if
(
false
==
fstNodeFindInput
(
root
,
inp
,
&
res
))
{
pthread_mutex_unlock
(
&
fst
->
mtx
);
//
pthread_mutex_unlock(&fst->mtx);
return
false
;
}
...
...
@@ -1007,7 +1062,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
taosArrayPush
(
nodes
,
&
root
);
}
if
(
!
FST_NODE_IS_FINAL
(
root
))
{
pthread_mutex_unlock
(
&
fst
->
mtx
);
//
pthread_mutex_unlock(&fst->mtx);
return
false
;
}
else
{
tOut
=
tOut
+
FST_NODE_FINAL_OUTPUT
(
root
);
...
...
@@ -1018,8 +1073,8 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
fstNodeDestroy
(
*
node
);
}
taosArrayDestroy
(
nodes
);
fst
->
root
=
NULL
;
pthread_mutex_unlock
(
&
fst
->
mtx
);
//
fst->root = NULL;
//
pthread_mutex_unlock(&fst->mtx);
*
out
=
tOut
;
return
true
;
}
...
...
@@ -1028,7 +1083,9 @@ FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) {
return
fstStreamBuilderCreate
(
fst
,
ctx
);
}
StreamWithState
*
streamBuilderIntoStream
(
FstStreamBuilder
*
sb
)
{
if
(
sb
==
NULL
)
{
return
NULL
;
}
if
(
sb
==
NULL
)
{
return
NULL
;
}
return
streamWithStateCreate
(
sb
->
fst
,
sb
->
aut
,
sb
->
min
,
sb
->
max
);
}
FstStreamWithStateBuilder
*
fstSearchWithState
(
Fst
*
fst
,
AutomationCtx
*
ctx
)
{
...
...
@@ -1039,15 +1096,6 @@ FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
FstNode
*
fstGetRoot
(
Fst
*
fst
)
{
CompiledAddr
rAddr
=
fstGetRootAddr
(
fst
);
return
fstGetNode
(
fst
,
rAddr
);
// pthread_mutex_lock(&fst->mtx);
// if (fst->root != NULL) {
// // pthread_mutex_unlock(&fst->mtx);
// return fst->root;
//}
// CompiledAddr rAddr = fstGetRootAddr(fst);
// fst->root = fstGetNode(fst, rAddr);
//// pthread_mutex_unlock(&fst->mtx);
// return fst->root;
}
FstNode
*
fstGetNode
(
Fst
*
fst
,
CompiledAddr
addr
)
{
...
...
@@ -1074,14 +1122,18 @@ bool fstVerify(Fst* fst) {
uint32_t
len
,
checkSum
=
fst
->
meta
->
checkSum
;
uint8_t
*
data
=
fstSliceData
(
fst
->
data
,
&
len
);
TSCKSUM
initSum
=
0
;
if
(
!
taosCheckChecksumWhole
(
data
,
len
))
{
return
false
;
}
if
(
!
taosCheckChecksumWhole
(
data
,
len
))
{
return
false
;
}
return
true
;
}
// data bound function
FstBoundWithData
*
fstBoundStateCreate
(
FstBound
type
,
FstSlice
*
data
)
{
FstBoundWithData
*
b
=
calloc
(
1
,
sizeof
(
FstBoundWithData
));
if
(
b
==
NULL
)
{
return
NULL
;
}
if
(
b
==
NULL
)
{
return
NULL
;
}
if
(
data
!=
NULL
)
{
b
->
data
=
fstSliceCopy
(
data
,
data
->
start
,
data
->
end
);
...
...
@@ -1118,7 +1170,9 @@ void fstBoundDestroy(FstBoundWithData* bound) { free(bound); }
StreamWithState
*
streamWithStateCreate
(
Fst
*
fst
,
AutomationCtx
*
automation
,
FstBoundWithData
*
min
,
FstBoundWithData
*
max
)
{
StreamWithState
*
sws
=
calloc
(
1
,
sizeof
(
StreamWithState
));
if
(
sws
==
NULL
)
{
return
NULL
;
}
if
(
sws
==
NULL
)
{
return
NULL
;
}
sws
->
fst
=
fst
;
sws
->
aut
=
automation
;
...
...
@@ -1134,7 +1188,9 @@ StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstB
return
sws
;
}
void
streamWithStateDestroy
(
StreamWithState
*
sws
)
{
if
(
sws
==
NULL
)
{
return
;
}
if
(
sws
==
NULL
)
{
return
;
}
taosArrayDestroy
(
sws
->
inp
);
taosArrayDestroyEx
(
sws
->
stack
,
streamStateDestroy
);
...
...
@@ -1200,7 +1256,9 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
uint64_t
i
=
0
;
for
(
i
=
trans
->
range
.
start
;
i
<
trans
->
range
.
end
;
i
++
)
{
FstTransition
trn
;
if
(
fstNodeGetTransitionAt
(
node
,
i
,
&
trn
)
&&
trn
.
inp
>
b
)
{
break
;
}
if
(
fstNodeGetTransitionAt
(
node
,
i
,
&
trn
)
&&
trn
.
inp
>
b
)
{
break
;
}
}
StreamState
s
=
{.
node
=
node
,
.
trans
=
i
,
.
out
=
{.
null
=
false
,
.
out
=
out
},
.
autState
=
autState
};
...
...
@@ -1248,7 +1306,9 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
while
(
taosArrayGetSize
(
sws
->
stack
)
>
0
)
{
StreamState
*
p
=
(
StreamState
*
)
taosArrayPop
(
sws
->
stack
);
if
(
p
->
trans
>=
FST_NODE_LEN
(
p
->
node
)
||
!
automFuncs
[
aut
->
type
].
canMatch
(
aut
,
p
->
autState
))
{
if
(
FST_NODE_ADDR
(
p
->
node
)
!=
fstGetRootAddr
(
sws
->
fst
))
{
taosArrayPop
(
sws
->
inp
);
}
if
(
FST_NODE_ADDR
(
p
->
node
)
!=
fstGetRootAddr
(
sws
->
fst
))
{
taosArrayPop
(
sws
->
inp
);
}
streamStateDestroy
(
p
);
continue
;
}
...
...
@@ -1267,7 +1327,9 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
if
(
FST_NODE_IS_FINAL
(
nextNode
))
{
// void *eofState = sws->aut->acceptEof(nextState);
void
*
eofState
=
automFuncs
[
aut
->
type
].
acceptEof
(
aut
,
nextState
);
if
(
eofState
!=
NULL
)
{
isMatch
=
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
eofState
);
}
if
(
eofState
!=
NULL
)
{
isMatch
=
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
eofState
);
}
}
StreamState
s1
=
{.
node
=
p
->
node
,
.
trans
=
p
->
trans
+
1
,
.
out
=
p
->
out
,
.
autState
=
p
->
autState
};
taosArrayPush
(
sws
->
stack
,
&
s1
);
...
...
@@ -1277,24 +1339,26 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
size_t
isz
=
taosArrayGetSize
(
sws
->
inp
);
uint8_t
*
buf
=
(
uint8_t
*
)
malloc
(
isz
*
sizeof
(
uint8_t
));
for
(
uint32_t
i
=
0
;
i
<
isz
;
i
++
)
{
buf
[
i
]
=
*
(
uint8_t
*
)
taosArrayGet
(
sws
->
inp
,
i
);
}
for
(
uint32_t
i
=
0
;
i
<
isz
;
i
++
)
{
buf
[
i
]
=
*
(
uint8_t
*
)
taosArrayGet
(
sws
->
inp
,
i
);
}
FstSlice
slice
=
fstSliceCreate
(
buf
,
taosArrayGetSize
(
sws
->
inp
));
if
(
fstBoundWithDataExceededBy
(
sws
->
endAt
,
&
slice
))
{
taosArrayDestroyEx
(
sws
->
stack
,
streamStateDestroy
);
sws
->
stack
=
(
SArray
*
)
taosArrayInit
(
256
,
sizeof
(
StreamState
));
free
(
buf
);
t
free
(
buf
);
fstSliceDestroy
(
&
slice
);
return
NULL
;
}
if
(
FST_NODE_IS_FINAL
(
nextNode
)
&&
isMatch
)
{
FstOutput
fOutput
=
{.
null
=
false
,
.
out
=
out
+
FST_NODE_FINAL_OUTPUT
(
nextNode
)};
StreamWithStateResult
*
result
=
swsResultCreate
(
&
slice
,
fOutput
,
tState
);
free
(
buf
);
t
free
(
buf
);
fstSliceDestroy
(
&
slice
);
taosArrayDestroy
(
nodes
);
return
result
;
}
free
(
buf
);
t
free
(
buf
);
fstSliceDestroy
(
&
slice
);
}
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
nodes
);
i
++
)
{
...
...
@@ -1307,16 +1371,19 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
StreamWithStateResult
*
swsResultCreate
(
FstSlice
*
data
,
FstOutput
fOut
,
void
*
state
)
{
StreamWithStateResult
*
result
=
calloc
(
1
,
sizeof
(
StreamWithStateResult
));
if
(
result
==
NULL
)
{
return
NULL
;
}
if
(
result
==
NULL
)
{
return
NULL
;
}
result
->
data
=
fstSliceCopy
(
data
,
0
,
FST_SLICE_LEN
(
data
)
-
1
);
result
->
out
=
fOut
;
result
->
state
=
state
;
return
result
;
}
void
swsResultDestroy
(
StreamWithStateResult
*
result
)
{
if
(
NULL
==
result
)
{
return
;
}
if
(
NULL
==
result
)
{
return
;
}
fstSliceDestroy
(
&
result
->
data
);
startWithStateValueDestroy
(
result
->
state
);
...
...
@@ -1324,16 +1391,18 @@ void swsResultDestroy(StreamWithStateResult* result) {
}
void
streamStateDestroy
(
void
*
s
)
{
if
(
NULL
==
s
)
{
return
;
}
if
(
NULL
==
s
)
{
return
;
}
StreamState
*
ss
=
(
StreamState
*
)
s
;
fstNodeDestroy
(
ss
->
node
);
// free(s->autoState);
}
FstStreamBuilder
*
fstStreamBuilderCreate
(
Fst
*
fst
,
AutomationCtx
*
aut
)
{
FstStreamBuilder
*
b
=
calloc
(
1
,
sizeof
(
FstStreamBuilder
));
if
(
NULL
==
b
)
{
return
NULL
;
}
if
(
NULL
==
b
)
{
return
NULL
;
}
b
->
fst
=
fst
;
b
->
aut
=
aut
;
...
...
@@ -1349,8 +1418,9 @@ void fstStreamBuilderDestroy(FstStreamBuilder* b) {
free
(
b
);
}
FstStreamBuilder
*
fstStreamBuilderRange
(
FstStreamBuilder
*
b
,
FstSlice
*
val
,
RangeType
type
)
{
if
(
b
==
NULL
)
{
return
NULL
;
}
if
(
b
==
NULL
)
{
return
NULL
;
}
if
(
type
==
GE
)
{
b
->
min
->
type
=
Included
;
fstSliceDestroy
(
&
(
b
->
min
->
data
));
...
...
source/libs/index/src/index_fst_automation.c
浏览文件 @
9233663a
...
...
@@ -17,7 +17,9 @@
StartWithStateValue
*
startWithStateValueCreate
(
StartWithStateKind
kind
,
ValueType
ty
,
void
*
val
)
{
StartWithStateValue
*
nsv
=
calloc
(
1
,
sizeof
(
StartWithStateValue
));
if
(
nsv
==
NULL
)
{
return
NULL
;
}
if
(
nsv
==
NULL
)
{
return
NULL
;
}
nsv
->
kind
=
kind
;
nsv
->
type
=
ty
;
...
...
@@ -35,7 +37,9 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp
}
void
startWithStateValueDestroy
(
void
*
val
)
{
StartWithStateValue
*
sv
=
(
StartWithStateValue
*
)
val
;
if
(
sv
==
NULL
)
{
return
;
}
if
(
sv
==
NULL
)
{
return
;
}
if
(
sv
->
type
==
FST_INT
)
{
//
...
...
@@ -48,7 +52,9 @@ void startWithStateValueDestroy(void* val) {
}
StartWithStateValue
*
startWithStateValueDump
(
StartWithStateValue
*
sv
)
{
StartWithStateValue
*
nsv
=
calloc
(
1
,
sizeof
(
StartWithStateValue
));
if
(
nsv
==
NULL
)
{
return
NULL
;
}
if
(
nsv
==
NULL
)
{
return
NULL
;
}
nsv
->
kind
=
sv
->
kind
;
nsv
->
type
=
sv
->
type
;
...
...
@@ -88,10 +94,14 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) {
static
bool
prefixWillAlwaysMatch
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
true
;
}
static
void
*
prefixAccept
(
AutomationCtx
*
ctx
,
void
*
state
,
uint8_t
byte
)
{
StartWithStateValue
*
ssv
=
(
StartWithStateValue
*
)
state
;
if
(
ssv
==
NULL
||
ctx
==
NULL
)
{
return
NULL
;
}
if
(
ssv
==
NULL
||
ctx
==
NULL
)
{
return
NULL
;
}
char
*
data
=
ctx
->
data
;
if
(
ssv
->
kind
==
Done
)
{
return
startWithStateValueCreate
(
Done
,
FST_INT
,
&
ssv
->
val
);
}
if
(
ssv
->
kind
==
Done
)
{
return
startWithStateValueCreate
(
Done
,
FST_INT
,
&
ssv
->
val
);
}
if
((
strlen
(
data
)
>
ssv
->
val
)
&&
data
[
ssv
->
val
]
==
byte
)
{
int
val
=
ssv
->
val
+
1
;
...
...
@@ -128,7 +138,9 @@ AutomationFunc automFuncs[] = {
AutomationCtx
*
automCtxCreate
(
void
*
data
,
AutomationType
atype
)
{
AutomationCtx
*
ctx
=
calloc
(
1
,
sizeof
(
AutomationCtx
));
if
(
ctx
==
NULL
)
{
return
NULL
;
}
if
(
ctx
==
NULL
)
{
return
NULL
;
}
StartWithStateValue
*
sv
=
NULL
;
if
(
atype
==
AUTOMATION_ALWAYS
)
{
...
...
source/libs/index/src/index_fst_util.c
浏览文件 @
9233663a
...
...
@@ -29,18 +29,6 @@ const uint64_t VERSION = 3;
const
uint64_t
TRANS_INDEX_THRESHOLD
=
32
;
// uint8_t commonInput(uint8_t idx) {
// if (idx == 0) { return -1; }
// else {
// return COMMON_INPUTS_INV[idx - 1];
// }
//}
//
// uint8_t commonIdx(uint8_t v, uint8_t max) {
// uint8_t v = ((uint16_t)tCOMMON_INPUTS[v] + 1)%256;
// return v > max ? 0: v;
//}
uint8_t
packSize
(
uint64_t
n
)
{
if
(
n
<
(
1u
<<
8
))
{
return
1
;
...
...
@@ -103,9 +91,6 @@ FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
FstSlice
fstSliceCopy
(
FstSlice
*
s
,
int32_t
start
,
int32_t
end
)
{
FstString
*
str
=
s
->
str
;
str
->
ref
++
;
// uint8_t *buf = fstSliceData(s, &alen);
// start = buf + start - (buf - s->start);
// end = buf + end - (buf - s->start);
FstSlice
t
=
{.
str
=
str
,
.
start
=
start
+
s
->
start
,
.
end
=
end
+
s
->
start
};
return
t
;
...
...
@@ -130,19 +115,19 @@ FstSlice fstSliceDeepCopy(FstSlice* s, int32_t start, int32_t end) {
ans
.
end
=
tlen
-
1
;
return
ans
;
}
bool
fstSliceIsEmpty
(
FstSlice
*
s
)
{
return
s
->
str
==
NULL
||
s
->
str
->
len
==
0
||
s
->
start
<
0
||
s
->
end
<
0
;
}
bool
fstSliceIsEmpty
(
FstSlice
*
s
)
{
return
s
->
str
==
NULL
||
s
->
str
->
len
==
0
||
s
->
start
<
0
||
s
->
end
<
0
;
}
uint8_t
*
fstSliceData
(
FstSlice
*
s
,
int32_t
*
size
)
{
FstString
*
str
=
s
->
str
;
if
(
size
!=
NULL
)
{
*
size
=
s
->
end
-
s
->
start
+
1
;
}
if
(
size
!=
NULL
)
{
*
size
=
s
->
end
-
s
->
start
+
1
;
}
return
str
->
data
+
s
->
start
;
}
void
fstSliceDestroy
(
FstSlice
*
s
)
{
FstString
*
str
=
s
->
str
;
str
->
ref
--
;
if
(
str
->
ref
<
=
0
)
{
if
(
str
->
ref
=
=
0
)
{
free
(
str
->
data
);
free
(
str
);
s
->
str
=
NULL
;
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
9233663a
...
...
@@ -13,8 +13,6 @@ p *
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#include <sys/types.h>
//#include <dirent.h>
#include "index_tfile.h"
#include "index.h"
#include "index_fst.h"
...
...
@@ -61,7 +59,9 @@ static void tfileGenFileFullName(char* fullname, const char* path, uint64_t s
TFileCache
*
tfileCacheCreate
(
const
char
*
path
)
{
TFileCache
*
tcache
=
calloc
(
1
,
sizeof
(
TFileCache
));
if
(
tcache
==
NULL
)
{
return
NULL
;
}
if
(
tcache
==
NULL
)
{
return
NULL
;
}
tcache
->
tableCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
tcache
->
capacity
=
64
;
...
...
@@ -98,7 +98,9 @@ End:
return
NULL
;
}
void
tfileCacheDestroy
(
TFileCache
*
tcache
)
{
if
(
tcache
==
NULL
)
{
return
;
}
if
(
tcache
==
NULL
)
{
return
;
}
// free table cache
TFileReader
**
reader
=
taosHashIterate
(
tcache
->
tableCache
,
NULL
);
...
...
@@ -119,7 +121,9 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
int32_t
sz
=
indexSerialCacheKey
(
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
TFileReader
**
reader
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
sz
);
if
(
reader
==
NULL
)
{
return
NULL
;
}
if
(
reader
==
NULL
)
{
return
NULL
;
}
tfileReaderRef
(
*
reader
);
return
*
reader
;
...
...
@@ -142,7 +146,9 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
}
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
)
{
TFileReader
*
reader
=
calloc
(
1
,
sizeof
(
TFileReader
));
if
(
reader
==
NULL
)
{
return
NULL
;
}
if
(
reader
==
NULL
)
{
return
NULL
;
}
reader
->
ctx
=
ctx
;
...
...
@@ -169,7 +175,9 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
return
reader
;
}
void
tfileReaderDestroy
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
;
}
if
(
reader
==
NULL
)
{
return
;
}
// T_REF_INC(reader);
fstDestroy
(
reader
->
fst
);
writerCtxDestroy
(
reader
->
ctx
,
reader
->
remove
);
...
...
@@ -209,7 +217,9 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
tfileGenFileFullName
(
fullname
,
path
,
suid
,
colName
,
version
);
// indexInfo("open write file name %s", fullname);
WriterCtx
*
wcx
=
writerCtxCreate
(
TFile
,
fullname
,
false
,
1024
*
1024
*
64
);
if
(
wcx
==
NULL
)
{
return
NULL
;
}
if
(
wcx
==
NULL
)
{
return
NULL
;
}
TFileHeader
tfh
=
{
0
};
tfh
.
suid
=
suid
;
...
...
@@ -225,7 +235,9 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
WriterCtx
*
wc
=
writerCtxCreate
(
TFile
,
fullname
,
true
,
1024
*
1024
*
1024
);
indexInfo
(
"open read file name:%s, size: %d"
,
wc
->
file
.
buf
,
wc
->
file
.
size
);
if
(
wc
==
NULL
)
{
return
NULL
;
}
if
(
wc
==
NULL
)
{
return
NULL
;
}
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
return
reader
;
...
...
@@ -316,19 +328,25 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
return
0
;
}
void
tfileWriterClose
(
TFileWriter
*
tw
)
{
if
(
tw
==
NULL
)
{
return
;
}
if
(
tw
==
NULL
)
{
return
;
}
writerCtxDestroy
(
tw
->
ctx
,
false
);
free
(
tw
);
}
void
tfileWriterDestroy
(
TFileWriter
*
tw
)
{
if
(
tw
==
NULL
)
{
return
;
}
if
(
tw
==
NULL
)
{
return
;
}
writerCtxDestroy
(
tw
->
ctx
,
false
);
free
(
tw
);
}
IndexTFile
*
indexTFileCreate
(
const
char
*
path
)
{
TFileCache
*
cache
=
tfileCacheCreate
(
path
);
if
(
cache
==
NULL
)
{
return
NULL
;
}
if
(
cache
==
NULL
)
{
return
NULL
;
}
IndexTFile
*
tfile
=
calloc
(
1
,
sizeof
(
IndexTFile
));
if
(
tfile
==
NULL
)
{
...
...
@@ -340,21 +358,27 @@ IndexTFile* indexTFileCreate(const char* path) {
return
tfile
;
}
void
indexTFileDestroy
(
IndexTFile
*
tfile
)
{
if
(
tfile
==
NULL
)
{
return
;
}
if
(
tfile
==
NULL
)
{
return
;
}
tfileCacheDestroy
(
tfile
->
cache
);
free
(
tfile
);
}
int
indexTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
int
ret
=
-
1
;
if
(
tfile
==
NULL
)
{
return
ret
;
}
if
(
tfile
==
NULL
)
{
return
ret
;
}
IndexTFile
*
pTfile
=
(
IndexTFile
*
)
tfile
;
SIndexTerm
*
term
=
query
->
term
;
ICacheKey
key
=
{.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
TFileReader
*
reader
=
tfileCacheGet
(
pTfile
->
cache
,
&
key
);
if
(
reader
==
NULL
)
{
return
0
;
}
if
(
reader
==
NULL
)
{
return
0
;
}
return
tfileReaderSearch
(
reader
,
query
,
result
);
}
...
...
@@ -373,7 +397,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
TFileFstIter
*
tIter
=
iiter
->
iter
;
StreamWithStateResult
*
rt
=
streamWithStateNextWith
(
tIter
->
st
,
NULL
);
if
(
rt
==
NULL
)
{
return
false
;
}
if
(
rt
==
NULL
)
{
return
false
;
}
int32_t
sz
=
0
;
char
*
ch
=
(
char
*
)
fstSliceData
(
&
rt
->
data
,
&
sz
);
...
...
@@ -383,7 +409,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
offset
=
(
uint64_t
)(
rt
->
out
.
out
);
swsResultDestroy
(
rt
);
// set up iterate value
if
(
tfileReaderLoadTableIds
(
tIter
->
rdr
,
offset
,
iv
->
val
)
!=
0
)
{
return
false
;
}
if
(
tfileReaderLoadTableIds
(
tIter
->
rdr
,
offset
,
iv
->
val
)
!=
0
)
{
return
false
;
}
iv
->
colVal
=
colVal
;
return
true
;
...
...
@@ -394,7 +422,9 @@ static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
static
TFileFstIter
*
tfileFstIteratorCreate
(
TFileReader
*
reader
)
{
TFileFstIter
*
tIter
=
calloc
(
1
,
sizeof
(
TFileFstIter
));
if
(
tIter
==
NULL
)
{
return
NULL
;
}
if
(
tIter
==
NULL
)
{
return
NULL
;
}
tIter
->
ctx
=
automCtxCreate
(
NULL
,
AUTOMATION_ALWAYS
);
tIter
->
fb
=
fstSearch
(
reader
->
fst
,
tIter
->
ctx
);
...
...
@@ -404,7 +434,9 @@ static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
}
Iterate
*
tfileIteratorCreate
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
NULL
;
}
if
(
reader
==
NULL
)
{
return
NULL
;
}
Iterate
*
iter
=
calloc
(
1
,
sizeof
(
Iterate
));
iter
->
iter
=
tfileFstIteratorCreate
(
reader
);
...
...
@@ -419,7 +451,9 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
return
iter
;
}
void
tfileIteratorDestroy
(
Iterate
*
iter
)
{
if
(
iter
==
NULL
)
{
return
;
}
if
(
iter
==
NULL
)
{
return
;
}
IterateValue
*
iv
=
&
iter
->
val
;
iterateValueDestroy
(
iv
,
true
);
...
...
@@ -434,7 +468,9 @@ void tfileIteratorDestroy(Iterate* iter) {
}
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
uint64_t
suid
,
char
*
colName
)
{
if
(
tf
==
NULL
)
{
return
NULL
;
}
if
(
tf
==
NULL
)
{
return
NULL
;
}
ICacheKey
key
=
{.
suid
=
suid
,
.
colType
=
TSDB_DATA_TYPE_BINARY
,
.
colName
=
colName
,
.
nColName
=
strlen
(
colName
)};
return
tfileCacheGet
(
tf
->
cache
,
&
key
);
}
...
...
@@ -446,7 +482,9 @@ static int tfileUidCompare(const void* a, const void* b) {
}
static
int
tfileStrCompare
(
const
void
*
a
,
const
void
*
b
)
{
int
ret
=
strcmp
((
char
*
)
a
,
(
char
*
)
b
);
if
(
ret
==
0
)
{
return
ret
;
}
if
(
ret
==
0
)
{
return
ret
;
}
return
ret
<
0
?
-
1
:
1
;
}
...
...
@@ -461,13 +499,17 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
TFileValue
*
tfileValueCreate
(
char
*
val
)
{
TFileValue
*
tf
=
calloc
(
1
,
sizeof
(
TFileValue
));
if
(
tf
==
NULL
)
{
return
NULL
;
}
if
(
tf
==
NULL
)
{
return
NULL
;
}
tf
->
colVal
=
tstrdup
(
val
);
tf
->
tableId
=
taosArrayInit
(
32
,
sizeof
(
uint64_t
));
return
tf
;
}
int
tfileValuePush
(
TFileValue
*
tf
,
uint64_t
val
)
{
if
(
tf
==
NULL
)
{
return
-
1
;
}
if
(
tf
==
NULL
)
{
return
-
1
;
}
taosArrayPush
(
tf
->
tableId
,
&
val
);
return
0
;
}
...
...
@@ -489,7 +531,9 @@ static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t
fstOffset
=
offset
+
sizeof
(
tw
->
header
.
fstOffset
);
tw
->
header
.
fstOffset
=
fstOffset
;
if
(
sizeof
(
fstOffset
)
!=
tw
->
ctx
->
write
(
tw
->
ctx
,
(
char
*
)
&
fstOffset
,
sizeof
(
fstOffset
)))
{
return
-
1
;
}
if
(
sizeof
(
fstOffset
)
!=
tw
->
ctx
->
write
(
tw
->
ctx
,
(
char
*
)
&
fstOffset
,
sizeof
(
fstOffset
)))
{
return
-
1
;
}
indexInfo
(
"tfile write fst offset: %d"
,
tw
->
ctx
->
size
(
tw
->
ctx
));
tw
->
offset
+=
sizeof
(
fstOffset
);
return
0
;
...
...
@@ -502,7 +546,9 @@ static int tfileWriteHeader(TFileWriter* writer) {
indexInfo
(
"tfile pre write header size: %d"
,
writer
->
ctx
->
size
(
writer
->
ctx
));
int
nwrite
=
writer
->
ctx
->
write
(
writer
->
ctx
,
buf
,
sizeof
(
buf
));
if
(
sizeof
(
buf
)
!=
nwrite
)
{
return
-
1
;
}
if
(
sizeof
(
buf
)
!=
nwrite
)
{
return
-
1
;
}
indexInfo
(
"tfile after write header size: %d"
,
writer
->
ctx
->
size
(
writer
->
ctx
));
writer
->
offset
=
nwrite
;
...
...
@@ -556,7 +602,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
static
int
FST_MAX_SIZE
=
64
*
1024
*
1024
;
char
*
buf
=
calloc
(
1
,
sizeof
(
char
)
*
FST_MAX_SIZE
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
if
(
buf
==
NULL
)
{
return
-
1
;
}
WriterCtx
*
ctx
=
reader
->
ctx
;
int
size
=
ctx
->
size
(
ctx
);
...
...
@@ -586,12 +634,16 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
int32_t
total
=
sizeof
(
uint64_t
)
*
nid
;
char
*
buf
=
calloc
(
1
,
total
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
if
(
buf
==
NULL
)
{
return
-
1
;
}
nread
=
ctx
->
readFrom
(
ctx
,
buf
,
total
,
offset
+
sizeof
(
nid
));
assert
(
total
==
nread
);
for
(
int32_t
i
=
0
;
i
<
nid
;
i
++
)
{
taosArrayPush
(
result
,
(
uint64_t
*
)
buf
+
i
);
}
for
(
int32_t
i
=
0
;
i
<
nid
;
i
++
)
{
taosArrayPush
(
result
,
(
uint64_t
*
)
buf
+
i
);
}
free
(
buf
);
return
0
;
}
...
...
@@ -615,13 +667,17 @@ static int tfileReaderVerify(TFileReader* reader) {
}
void
tfileReaderRef
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
;
}
if
(
reader
==
NULL
)
{
return
;
}
int
ref
=
T_REF_INC
(
reader
);
UNUSED
(
ref
);
}
void
tfileReaderUnRef
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
;
}
if
(
reader
==
NULL
)
{
return
;
}
int
ref
=
T_REF_DEC
(
reader
);
if
(
ref
==
0
)
{
// do nothing
...
...
@@ -637,11 +693,15 @@ static SArray* tfileGetFileList(const char* path) {
uint32_t
version
;
DIR
*
dir
=
opendir
(
path
);
if
(
NULL
==
dir
)
{
return
NULL
;
}
if
(
NULL
==
dir
)
{
return
NULL
;
}
struct
dirent
*
entry
;
while
((
entry
=
readdir
(
dir
))
!=
NULL
)
{
char
*
file
=
entry
->
d_name
;
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
buf
,
&
version
))
{
continue
;
}
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
buf
,
&
version
))
{
continue
;
}
size_t
len
=
strlen
(
path
)
+
1
+
strlen
(
file
)
+
1
;
char
*
buf
=
calloc
(
1
,
len
);
...
...
source/libs/index/test/CMakeLists.txt
浏览文件 @
9233663a
add_executable
(
indexTest
""
)
add_executable
(
fstTest
""
)
add_executable
(
fstUT
""
)
target_sources
(
indexTest
PRIVATE
"indexTests.cc"
...
...
@@ -8,6 +10,11 @@ target_sources(fstTest
PRIVATE
"fstTest.cc"
)
target_sources
(
fstUT
PRIVATE
"fstUT.cc"
)
target_include_directories
(
indexTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
...
...
@@ -18,6 +25,12 @@ target_include_directories ( fstTest
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
fstUT
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
indexTest
os
util
...
...
@@ -32,6 +45,13 @@ target_link_libraries (fstTest
gtest_main
index
)
target_link_libraries
(
fstUT
os
util
common
gtest_main
index
)
#add_test(
...
...
source/libs/index/test/fstTest.cc
浏览文件 @
9233663a
...
...
@@ -58,7 +58,9 @@ class FstReadMemory {
bool
init
()
{
char
*
buf
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
_size
);
int
nRead
=
fstCountingWriterRead
(
_w
,
(
uint8_t
*
)
buf
,
_size
);
if
(
nRead
<=
0
)
{
return
false
;
}
if
(
nRead
<=
0
)
{
return
false
;
}
_size
=
nRead
;
_s
=
fstSliceCreate
((
uint8_t
*
)
buf
,
_size
);
_fst
=
fstCreate
(
&
_s
);
...
...
@@ -97,7 +99,8 @@ class FstReadMemory {
printf
(
"key: %s, val: %"
PRIu64
"
\n
"
,
key
.
c_str
(),
(
uint64_t
)(
rt
->
out
.
out
));
swsResultDestroy
(
rt
);
}
for
(
size_t
i
=
0
;
i
<
result
.
size
();
i
++
)
{}
for
(
size_t
i
=
0
;
i
<
result
.
size
();
i
++
)
{
}
std
::
cout
<<
std
::
endl
;
return
true
;
}
...
...
@@ -173,7 +176,9 @@ void checkMillonWriteAndReadOfFst() {
delete
fw
;
FstReadMemory
*
fr
=
new
FstReadMemory
(
1024
*
64
*
1024
);
if
(
fr
->
init
())
{
printf
(
"success to init fst read"
);
}
if
(
fr
->
init
())
{
printf
(
"success to init fst read"
);
}
Performance_fstReadRecords
(
fr
);
tfCleanup
();
...
...
source/libs/index/test/fstUT.cc
0 → 100644
浏览文件 @
9233663a
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#include "index_tfile.h"
#include "tglobal.h"
#include "tskiplist.h"
#include "tutil.h"
#include "ulog.h"
static
std
::
string
dir
=
"/tmp/index"
;
static
char
indexlog
[
PATH_MAX
]
=
{
0
};
static
char
tindex
[
PATH_MAX
]
=
{
0
};
static
char
tindexDir
[
PATH_MAX
]
=
{
0
};
static
void
EnvInit
()
{
tfInit
();
std
::
string
path
=
dir
;
taosRemoveDir
(
path
.
c_str
());
taosMkDir
(
path
.
c_str
());
// init log file
snprintf
(
indexlog
,
PATH_MAX
,
"%s/tindex.idx"
,
path
.
c_str
());
if
(
taosInitLog
(
indexlog
,
tsNumOfLogLines
,
1
)
!=
0
)
{
printf
(
"failed to init log"
);
}
// init index file
memset
(
tindex
,
0
,
sizeof
(
tindex
));
snprintf
(
tindex
,
PATH_MAX
,
"%s/tindex.idx"
,
path
.
c_str
());
}
static
void
EnvCleanup
()
{}
class
FstWriter
{
public:
FstWriter
()
{
_wc
=
writerCtxCreate
(
TFile
,
tindex
,
false
,
64
*
1024
*
1024
);
_b
=
fstBuilderCreate
(
_wc
,
0
);
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
// char buf[128] = {0};
// int len = 0;
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstBuilderInsert
(
_b
,
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
~
FstWriter
()
{
fstBuilderFinish
(
_b
);
fstBuilderDestroy
(
_b
);
writerCtxDestroy
(
_wc
,
false
);
}
private:
FstBuilder
*
_b
;
WriterCtx
*
_wc
;
};
class
FstReadMemory
{
public:
FstReadMemory
(
size_t
size
)
{
_wc
=
writerCtxCreate
(
TFile
,
tindex
,
true
,
64
*
1024
);
_w
=
fstCountingWriterCreate
(
_wc
);
_size
=
size
;
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
}
bool
init
()
{
char
*
buf
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
_size
);
int
nRead
=
fstCountingWriterRead
(
_w
,
(
uint8_t
*
)
buf
,
_size
);
if
(
nRead
<=
0
)
{
return
false
;
}
_size
=
nRead
;
_s
=
fstSliceCreate
((
uint8_t
*
)
buf
,
_size
);
_fst
=
fstCreate
(
&
_s
);
free
(
buf
);
return
_fst
!=
NULL
;
}
bool
Get
(
const
std
::
string
&
key
,
uint64_t
*
val
)
{
// char buf[128] = {0};
// int len = 0;
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstGet
(
_fst
,
&
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
bool
GetWithTimeCostUs
(
const
std
::
string
&
key
,
uint64_t
*
val
,
uint64_t
*
elapse
)
{
int64_t
s
=
taosGetTimestampUs
();
bool
ok
=
this
->
Get
(
key
,
val
);
int64_t
e
=
taosGetTimestampUs
();
*
elapse
=
e
-
s
;
return
ok
;
}
// add later
bool
Search
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>&
result
)
{
FstStreamBuilder
*
sb
=
fstSearch
(
_fst
,
ctx
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
// result.push_back((uint64_t)(rt->out.out));
FstSlice
*
s
=
&
rt
->
data
;
int32_t
sz
=
0
;
char
*
ch
=
(
char
*
)
fstSliceData
(
s
,
&
sz
);
std
::
string
key
(
ch
,
sz
);
printf
(
"key: %s, val: %"
PRIu64
"
\n
"
,
key
.
c_str
(),
(
uint64_t
)(
rt
->
out
.
out
));
swsResultDestroy
(
rt
);
}
for
(
size_t
i
=
0
;
i
<
result
.
size
();
i
++
)
{
}
std
::
cout
<<
std
::
endl
;
return
true
;
}
bool
SearchWithTimeCostUs
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>&
result
)
{
int64_t
s
=
taosGetTimestampUs
();
bool
ok
=
this
->
Search
(
ctx
,
result
);
int64_t
e
=
taosGetTimestampUs
();
return
ok
;
}
~
FstReadMemory
()
{
fstCountingWriterDestroy
(
_w
);
fstDestroy
(
_fst
);
fstSliceDestroy
(
&
_s
);
writerCtxDestroy
(
_wc
,
false
);
tfCleanup
();
}
private:
FstCountingWriter
*
_w
;
Fst
*
_fst
;
FstSlice
_s
;
WriterCtx
*
_wc
;
size_t
_size
;
};
class
FstWriterEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
fw
=
new
FstWriter
();
}
virtual
void
TearDown
()
{
delete
fw
;
}
FstWriter
*
fw
=
NULL
;
};
class
FstReadEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
fr
=
new
FstReadMemory
(
1024
);
}
virtual
void
TearDown
()
{
delete
fr
;
}
FstReadMemory
*
fr
=
NULL
;
};
class
TFst
{
public:
void
CreateWriter
()
{
fw
=
new
FstWriter
;
}
void
ReCreateWriter
()
{
if
(
fw
!=
NULL
)
delete
fw
;
fw
=
new
FstWriter
;
}
void
DestroyWriter
()
{
if
(
fw
!=
NULL
)
delete
fw
;
}
void
CreateReader
()
{
fr
=
new
FstReadMemory
(
1024
);
fr
->
init
();
}
void
ReCreateReader
()
{
if
(
fr
!=
NULL
)
delete
fr
;
fr
=
new
FstReadMemory
(
1024
);
}
void
DestroyReader
()
{
delete
fr
;
fr
=
NULL
;
}
bool
Put
(
const
std
::
string
&
k
,
uint64_t
v
)
{
if
(
fw
==
NULL
)
{
return
false
;
}
return
fw
->
Put
(
k
,
v
);
}
bool
Get
(
const
std
::
string
&
k
,
uint64_t
*
v
)
{
if
(
fr
==
NULL
)
{
return
false
;
}
return
fr
->
Get
(
k
,
v
);
}
private:
FstWriter
*
fw
;
FstReadMemory
*
fr
;
};
class
FstEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
EnvInit
();
fst
=
new
TFst
;
}
virtual
void
TearDown
()
{
delete
fst
;
}
TFst
*
fst
;
};
TEST_F
(
FstEnv
,
writeNormal
)
{
fst
->
CreateWriter
();
std
::
string
str
(
"aa"
);
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
str
[
0
]
=
'a'
+
i
;
str
.
resize
(
2
);
assert
(
fst
->
Put
(
str
,
i
)
==
true
);
}
// order failed
assert
(
fst
->
Put
(
"aa"
,
1
)
==
false
);
fst
->
DestroyWriter
();
fst
->
CreateReader
();
uint64_t
val
;
assert
(
fst
->
Get
(
"a"
,
&
val
)
==
false
);
assert
(
fst
->
Get
(
"aa"
,
&
val
)
==
true
);
assert
(
val
==
0
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录