Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
192552be
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
192552be
编写于
12月 21, 2021
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update index TFile write
上级
5d4d7b47
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
175 addition
and
260 deletion
+175
-260
source/libs/index/inc/index_fst_registry.h
source/libs/index/inc/index_fst_registry.h
+1
-3
source/libs/index/inc/index_tfile.h
source/libs/index/inc/index_tfile.h
+16
-13
source/libs/index/src/index.c
source/libs/index/src/index.c
+13
-21
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+23
-19
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+43
-130
source/libs/index/src/index_fst_counting_writer.c
source/libs/index/src/index_fst_counting_writer.c
+7
-19
source/libs/index/src/index_fst_registry.c
source/libs/index/src/index_fst_registry.c
+5
-15
source/libs/index/src/index_fst_util.c
source/libs/index/src/index_fst_util.c
+2
-3
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+65
-37
未找到文件。
source/libs/index/inc/index_fst_registry.h
浏览文件 @
192552be
...
...
@@ -30,9 +30,7 @@ typedef struct FstRegistryCell {
#define FST_REGISTRY_CELL_IS_EMPTY(cell) (cell->addr == NONE_ADDRESS)
#define FST_REGISTRY_CELL_INSERT(cell, tAddr) \
do { \
cell->addr = tAddr; \
} while (0)
do { cell->addr = tAddr; } while (0)
// typedef struct FstRegistryCache {
// SArray *cells;
...
...
source/libs/index/inc/index_tfile.h
浏览文件 @
192552be
...
...
@@ -26,26 +26,26 @@
extern
"C"
{
#endif
// tfile header
// tfile header
content
// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->|
// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->|
typedef
struct
TFile
Read
Header
{
typedef
struct
TFileHeader
{
uint64_t
suid
;
int32_t
version
;
char
colName
[
128
];
//
uint8_t
colType
;
}
TFile
Read
Header
;
}
TFileHeader
;
#define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t));
#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
typedef
struct
TFileCacheKey
{
uint64_t
suid
;
uint8_t
colType
;
int32_t
version
;
c
onst
char
*
colName
;
int32_t
nColName
;
uint64_t
suid
;
uint8_t
colType
;
int32_t
version
;
c
har
*
colName
;
int32_t
nColName
;
}
TFileCacheKey
;
// table cache
...
...
@@ -59,13 +59,14 @@ typedef struct TFileCache {
typedef
struct
TFileWriter
{
FstBuilder
*
fb
;
WriterCtx
*
ctx
;
TFileHeader
header
;
}
TFileWriter
;
typedef
struct
TFileReader
{
T_REF_DECLARE
()
Fst
*
fst
;
WriterCtx
*
ctx
;
TFile
Read
Header
header
;
Fst
*
fst
;
WriterCtx
*
ctx
;
TFileHeader
header
;
}
TFileReader
;
typedef
struct
IndexTFile
{
...
...
@@ -94,11 +95,13 @@ void tfileCacheDestroy(TFileCache *tcache);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
);
void
tfileCachePut
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
,
TFileReader
*
reader
);
TFileReader
*
tfileReaderCreate
();
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
);
void
TFileReaderDestroy
(
TFileReader
*
reader
);
TFileWriter
*
tfileWriterCreate
(
const
char
*
suid
,
const
char
*
colName
);
TFileWriter
*
tfileWriterCreate
(
WriterCtx
*
ctx
,
TFileHeader
*
header
);
void
tfileWriterDestroy
(
TFileWriter
*
tw
);
int
tfileWriterPut
(
TFileWriter
*
tw
,
const
char
*
key
,
int32_t
nKey
,
const
char
*
val
,
int32_t
nVal
);
int
tfileWriterFinish
(
TFileWriter
*
tw
);
//
IndexTFile
*
indexTFileCreate
(
const
char
*
path
);
...
...
source/libs/index/src/index.c
浏览文件 @
192552be
...
...
@@ -49,15 +49,14 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oTyp
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
pthread_once
(
&
isInit
,
indexInit
);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
#ifdef USE_LUCENE
index_t
*
index
=
index_open
(
path
);
sIdx
->
index
=
index
;
#endif
#ifdef USE_INVERTED_INDEX
sIdx
->
cache
=
(
void
*
)
indexCacheCreate
();
sIdx
->
tindex
=
NULL
;
sIdx
->
colObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
...
...
@@ -67,6 +66,10 @@ int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) {
*
index
=
sIdx
;
return
0
;
#endif
*
index
=
NULL
;
return
-
1
;
}
void
indexClose
(
SIndex
*
sIdx
)
{
...
...
@@ -126,9 +129,7 @@ int indexPut(SIndex *index, SIndexMultiTerm *fVals, uint64_t uid) {
int32_t
colId
=
fi
->
colId
;
int32_t
version
=
index
->
cVersion
;
int
ret
=
indexCachePut
(
index
->
cache
,
p
,
colId
,
version
,
uid
);
if
(
ret
!=
0
)
{
return
ret
;
}
if
(
ret
!=
0
)
{
return
ret
;
}
}
#endif
...
...
@@ -158,9 +159,7 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
int
tsz
=
0
;
index_multi_search
(
index
->
index
,
(
const
char
**
)
fields
,
(
const
char
**
)
keys
,
types
,
nQuery
,
opera
,
&
tResult
,
&
tsz
);
for
(
int
i
=
0
;
i
<
tsz
;
i
++
)
{
taosArrayPush
(
result
,
&
tResult
[
i
]);
}
for
(
int
i
=
0
;
i
<
tsz
;
i
++
)
{
taosArrayPush
(
result
,
&
tResult
[
i
]);
}
for
(
int
i
=
0
;
i
<
nQuery
;
i
++
)
{
free
(
fields
[
i
]);
...
...
@@ -216,9 +215,7 @@ void indexOptsDestroy(SIndexOpts *opts){
SIndexMultiTermQuery
*
indexMultiTermQueryCreate
(
EIndexOperatorType
opera
)
{
SIndexMultiTermQuery
*
p
=
(
SIndexMultiTermQuery
*
)
malloc
(
sizeof
(
SIndexMultiTermQuery
));
if
(
p
==
NULL
)
{
return
NULL
;
}
if
(
p
==
NULL
)
{
return
NULL
;
}
p
->
opera
=
opera
;
p
->
query
=
taosArrayInit
(
4
,
sizeof
(
SIndexTermQuery
));
return
p
;
...
...
@@ -240,9 +237,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EInde
SIndexTerm
*
indexTermCreate
(
int64_t
suid
,
SIndexOperOnColumn
oper
,
uint8_t
colType
,
const
char
*
colName
,
int32_t
nColName
,
const
char
*
colVal
,
int32_t
nColVal
)
{
SIndexTerm
*
t
=
(
SIndexTerm
*
)
calloc
(
1
,
(
sizeof
(
SIndexTerm
)));
if
(
t
==
NULL
)
{
return
NULL
;
}
if
(
t
==
NULL
)
{
return
NULL
;
}
t
->
suid
=
suid
;
t
->
operType
=
oper
;
...
...
@@ -320,9 +315,7 @@ static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result
return
0
;
}
static
void
indexInterResultsDestroy
(
SArray
*
results
)
{
if
(
results
==
NULL
)
{
return
;
}
if
(
results
==
NULL
)
{
return
;
}
size_t
sz
=
taosArrayGetSize
(
results
);
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -336,6 +329,7 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType
SArray
*
first
=
taosArrayGetP
(
interResults
,
0
);
taosArraySort
(
first
,
uidCompare
);
taosArrayRemoveDuplicate
(
first
,
uidCompare
,
NULL
);
if
(
oType
==
MUST
)
{
// just one column index, enhance later
taosArrayAddAll
(
fResults
,
first
);
...
...
@@ -351,9 +345,7 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType
return
0
;
}
static
int
indexMergeCacheIntoTindex
(
SIndex
*
sIdx
)
{
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
indexWarn
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
return
0
;
}
source/libs/index/src/index_cache.c
浏览文件 @
192552be
...
...
@@ -40,9 +40,7 @@ static int32_t compareKey(const void *l, const void *r) {
int16_t
lf
,
rf
;
// field id
memcpy
(
&
lf
,
lp
,
sizeof
(
lf
));
memcpy
(
&
rf
,
rp
,
sizeof
(
rf
));
if
(
lf
!=
rf
)
{
return
lf
<
rf
?
-
1
:
1
;
}
if
(
lf
!=
rf
)
{
return
lf
<
rf
?
-
1
:
1
;
}
lp
+=
sizeof
(
lf
);
rp
+=
sizeof
(
rf
);
...
...
@@ -89,9 +87,8 @@ static int32_t compareKey(const void *l, const void *r) {
int32_t
lv
,
rv
;
memcpy
(
&
lv
,
lp
,
sizeof
(
lv
));
memcpy
(
&
rv
,
rp
,
sizeof
(
rv
));
if
(
lv
!=
rv
)
{
return
lv
>
rv
?
-
1
:
1
;
}
if
(
lv
!=
rv
)
{
return
lv
>
rv
?
-
1
:
1
;
}
lp
+=
sizeof
(
lv
);
rp
+=
sizeof
(
rv
);
// not care item type
...
...
@@ -100,6 +97,10 @@ static int32_t compareKey(const void *l, const void *r) {
}
IndexCache
*
indexCacheCreate
()
{
IndexCache
*
cache
=
calloc
(
1
,
sizeof
(
IndexCache
));
if
(
cache
==
NULL
)
{
indexError
(
"failed to create index cache"
);
return
NULL
;
}
cache
->
skiplist
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
MAX_INDEX_KEY_LEN
,
compareKey
,
SL_ALLOW_DUP_KEY
,
getIndexKey
);
return
cache
;
...
...
@@ -107,23 +108,20 @@ IndexCache *indexCacheCreate() {
void
indexCacheDestroy
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
if
(
pCache
==
NULL
)
{
return
;
}
if
(
pCache
==
NULL
)
{
return
;
}
tSkipListDestroy
(
pCache
->
skiplist
);
free
(
pCache
);
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
if
(
cache
==
NULL
)
{
return
-
1
;
}
IndexCache
*
pCache
=
cache
;
// encode data
int32_t
total
=
CACHE_KEY_LEN
(
term
);
char
*
buf
=
calloc
(
1
,
total
);
char
*
p
=
buf
;
char
*
buf
=
calloc
(
1
,
total
);
char
*
p
=
buf
;
SERIALIZE_VAR_TO_BUF
(
p
,
total
,
int32_t
);
SERIALIZE_VAR_TO_BUF
(
p
,
colId
,
int16_t
);
...
...
@@ -145,11 +143,13 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t
IndexCache
*
pCache
=
cache
;
return
0
;
}
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
IndexCache
*
pCache
=
cache
;
SIndexTerm
*
term
=
query
->
term
;
EIndexQueryType
qtype
=
query
->
qType
;
...
...
@@ -158,9 +158,13 @@ int indexCacheSearch(
char
*
buf
=
calloc
(
1
,
keyLen
);
if
(
qtype
==
QUERY_TERM
)
{
//
}
else
if
(
qtype
==
QUERY_PREFIX
)
{
//
}
else
if
(
qtype
==
QUERY_SUFFIX
)
{
//
}
else
if
(
qtype
==
QUERY_REGEX
)
{
//
}
return
0
;
...
...
source/libs/index/src/index_fst.c
浏览文件 @
192552be
...
...
@@ -31,9 +31,7 @@ static uint8_t fstPackDetla(FstCountingWriter *wrt, CompiledAddr nodeAddr, Compi
FstUnFinishedNodes
*
fstUnFinishedNodesCreate
()
{
FstUnFinishedNodes
*
nodes
=
malloc
(
sizeof
(
FstUnFinishedNodes
));
if
(
nodes
==
NULL
)
{
return
NULL
;
}
if
(
nodes
==
NULL
)
{
return
NULL
;
}
nodes
->
stack
=
(
SArray
*
)
taosArrayInit
(
64
,
sizeof
(
FstBuilderNodeUnfinished
));
fstUnFinishedNodesPushEmpty
(
nodes
,
false
);
...
...
@@ -46,9 +44,7 @@ void unFinishedNodeDestroyElem(void *elem) {
b
->
last
=
NULL
;
}
void
fstUnFinishedNodesDestroy
(
FstUnFinishedNodes
*
nodes
)
{
if
(
nodes
==
NULL
)
{
return
;
}
if
(
nodes
==
NULL
)
{
return
;
}
taosArrayDestroyEx
(
nodes
->
stack
,
unFinishedNodeDestroyElem
);
free
(
nodes
);
...
...
@@ -97,9 +93,7 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *nodes, CompiledAddr add
}
void
fstUnFinishedNodesAddSuffix
(
FstUnFinishedNodes
*
nodes
,
FstSlice
bs
,
Output
out
)
{
FstSlice
*
s
=
&
bs
;
if
(
fstSliceIsEmpty
(
s
))
{
return
;
}
if
(
fstSliceIsEmpty
(
s
))
{
return
;
}
size_t
sz
=
taosArrayGetSize
(
nodes
->
stack
)
-
1
;
FstBuilderNodeUnfinished
*
un
=
taosArrayGet
(
nodes
->
stack
,
sz
);
assert
(
un
->
last
==
NULL
);
...
...
@@ -179,9 +173,7 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node,
FstState
fstStateCreateFrom
(
FstSlice
*
slice
,
CompiledAddr
addr
)
{
FstState
fs
=
{.
state
=
EmptyFinal
,
.
val
=
0
};
if
(
addr
==
EMPTY_ADDRESS
)
{
return
fs
;
}
if
(
addr
==
EMPTY_ADDRESS
)
{
return
fs
;
}
uint8_t
*
data
=
fstSliceData
(
slice
,
NULL
);
uint8_t
v
=
data
[
addr
];
...
...
@@ -236,9 +228,7 @@ void fstStateCompileForOneTrans(FstCountingWriter *w, CompiledAddr addr, FstTran
fstStateSetCommInput
(
&
st
,
trn
->
inp
);
bool
null
=
false
;
uint8_t
inp
=
fstStateCommInput
(
&
st
,
&
null
);
if
(
null
==
true
)
{
fstCountingWriterWrite
(
w
,
(
char
*
)
&
trn
->
inp
,
sizeof
(
trn
->
inp
));
}
if
(
null
==
true
)
{
fstCountingWriterWrite
(
w
,
(
char
*
)
&
trn
->
inp
,
sizeof
(
trn
->
inp
));
}
fstCountingWriterWrite
(
w
,
(
char
*
)(
&
(
st
.
val
)),
sizeof
(
st
.
val
));
return
;
}
...
...
@@ -272,9 +262,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuil
fstStateSetStateNtrans
(
&
st
,
(
uint8_t
)
sz
);
if
(
anyOuts
)
{
if
(
FST_BUILDER_NODE_IS_FINAL
(
node
))
{
fstCountingWriterPackUintIn
(
w
,
node
->
finalOutput
,
oSize
);
}
if
(
FST_BUILDER_NODE_IS_FINAL
(
node
))
{
fstCountingWriterPackUintIn
(
w
,
node
->
finalOutput
,
oSize
);
}
for
(
int32_t
i
=
sz
-
1
;
i
>=
0
;
i
--
)
{
FstTransition
*
t
=
taosArrayGet
(
node
->
trans
,
i
);
fstCountingWriterPackUintIn
(
w
,
t
->
out
,
oSize
);
...
...
@@ -439,9 +427,7 @@ Output fstStateOutput(FstState *s, FstNode *node) {
assert
(
s
->
state
==
OneTrans
);
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
node
->
sizes
);
if
(
oSizes
==
0
)
{
return
0
;
}
if
(
oSizes
==
0
)
{
return
0
;
}
FstSlice
*
slice
=
&
node
->
data
;
uint8_t
tSizes
=
FST_GET_TRANSITION_PACK_SIZE
(
node
->
sizes
);
...
...
@@ -453,9 +439,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
assert
(
s
->
state
==
AnyTrans
);
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
node
->
sizes
);
if
(
oSizes
==
0
)
{
return
0
;
}
if
(
oSizes
==
0
)
{
return
0
;
}
FstSlice
*
slice
=
&
node
->
data
;
uint8_t
*
data
=
fstSliceData
(
slice
,
NULL
);
uint64_t
at
=
node
->
start
-
fstStateNtransLen
(
s
)
-
1
// pack size
...
...
@@ -468,9 +452,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
void
fstStateSetFinalState
(
FstState
*
s
,
bool
yes
)
{
assert
(
s
->
state
==
AnyTrans
);
if
(
yes
)
{
s
->
val
|=
0
b01000000
;
}
if
(
yes
)
{
s
->
val
|=
0
b01000000
;
}
return
;
}
bool
fstStateIsFinalState
(
FstState
*
s
)
{
...
...
@@ -480,9 +462,7 @@ bool fstStateIsFinalState(FstState *s) {
void
fstStateSetStateNtrans
(
FstState
*
s
,
uint8_t
n
)
{
assert
(
s
->
state
==
AnyTrans
);
if
(
n
<=
0
b00111111
)
{
s
->
val
=
(
s
->
val
&
0
b11000000
)
|
n
;
}
if
(
n
<=
0
b00111111
)
{
s
->
val
=
(
s
->
val
&
0
b11000000
)
|
n
;
}
return
;
}
// state_ntrans
...
...
@@ -514,9 +494,7 @@ uint64_t fstStateNtransLen(FstState *s) {
uint64_t
fstStateNtrans
(
FstState
*
s
,
FstSlice
*
slice
)
{
bool
null
=
false
;
uint8_t
n
=
fstStateStateNtrans
(
s
,
&
null
);
if
(
null
!=
true
)
{
return
n
;
}
if
(
null
!=
true
)
{
return
n
;
}
int32_t
len
;
uint8_t
*
data
=
fstSliceData
(
slice
,
&
len
);
n
=
data
[
len
-
2
];
...
...
@@ -526,9 +504,7 @@ uint64_t fstStateNtrans(FstState *s, FstSlice *slice) {
}
Output
fstStateFinalOutput
(
FstState
*
s
,
uint64_t
version
,
FstSlice
*
slice
,
PackSizes
sizes
,
uint64_t
nTrans
)
{
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
sizes
);
if
(
oSizes
==
0
||
!
fstStateIsFinalState
(
s
))
{
return
0
;
}
if
(
oSizes
==
0
||
!
fstStateIsFinalState
(
s
))
{
return
0
;
}
uint64_t
at
=
FST_SLICE_LEN
(
slice
)
-
1
-
fstStateNtransLen
(
s
)
-
1
// pack size
-
fstStateTotalTransSize
(
s
,
version
,
sizes
,
nTrans
)
-
(
nTrans
*
oSizes
)
-
oSizes
;
...
...
@@ -545,9 +521,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
uint8_t
*
data
=
fstSliceData
(
slice
,
&
dlen
);
uint64_t
i
=
data
[
at
+
b
];
// uint64_t i = slice->data[slice->start + at + b];
if
(
i
>=
node
->
nTrans
)
{
*
null
=
true
;
}
if
(
i
>=
node
->
nTrans
)
{
*
null
=
true
;
}
return
i
;
}
else
{
uint64_t
start
=
node
->
start
-
fstStateNtransLen
(
s
)
-
1
// pack size
...
...
@@ -564,9 +538,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
return
node
->
nTrans
-
i
-
1
;
// bug
}
}
if
(
i
==
len
)
{
*
null
=
true
;
}
if
(
i
==
len
)
{
*
null
=
true
;
}
fstSliceDestroy
(
&
t
);
}
}
...
...
@@ -575,9 +547,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
FstNode
*
fstNodeCreate
(
int64_t
version
,
CompiledAddr
addr
,
FstSlice
*
slice
)
{
FstNode
*
n
=
(
FstNode
*
)
malloc
(
sizeof
(
FstNode
));
if
(
n
==
NULL
)
{
return
NULL
;
}
if
(
n
==
NULL
)
{
return
NULL
;
}
FstState
st
=
fstStateCreateFrom
(
slice
,
addr
);
...
...
@@ -643,9 +613,7 @@ void fstNodeDestroy(FstNode *node) {
}
FstTransitions
*
fstNodeTransitions
(
FstNode
*
node
)
{
FstTransitions
*
t
=
malloc
(
sizeof
(
FstTransitions
));
if
(
NULL
==
t
)
{
return
NULL
;
}
if
(
NULL
==
t
)
{
return
NULL
;
}
FstRange
range
=
{.
start
=
0
,
.
end
=
FST_NODE_LEN
(
node
)};
t
->
range
=
range
;
t
->
node
=
node
;
...
...
@@ -752,9 +720,7 @@ bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, Compiled
FstBuilder
*
fstBuilderCreate
(
void
*
w
,
FstType
ty
)
{
FstBuilder
*
b
=
malloc
(
sizeof
(
FstBuilder
));
if
(
NULL
==
b
)
{
return
b
;
}
if
(
NULL
==
b
)
{
return
b
;
}
b
->
wrt
=
fstCountingWriterCreate
(
w
);
b
->
unfinished
=
fstUnFinishedNodesCreate
();
...
...
@@ -776,9 +742,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) {
return
b
;
}
void
fstBuilderDestroy
(
FstBuilder
*
b
)
{
if
(
b
==
NULL
)
{
return
;
}
if
(
b
==
NULL
)
{
return
;
}
fstCountingWriterDestroy
(
b
->
wrt
);
fstUnFinishedNodesDestroy
(
b
->
unfinished
);
...
...
@@ -879,9 +843,7 @@ CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn) {
fstBuilderNodeCompileTo
(
bn
,
b
->
wrt
,
b
->
lastAddr
,
startAddr
);
b
->
lastAddr
=
(
CompiledAddr
)(
FST_WRITER_COUNT
(
b
->
wrt
)
-
1
);
if
(
entry
->
state
==
NOTFOUND
)
{
FST_REGISTRY_CELL_INSERT
(
entry
->
cell
,
b
->
lastAddr
);
}
if
(
entry
->
state
==
NOTFOUND
)
{
FST_REGISTRY_CELL_INSERT
(
entry
->
cell
,
b
->
lastAddr
);
}
fstRegistryEntryDestroy
(
entry
);
return
b
->
lastAddr
;
...
...
@@ -924,9 +886,7 @@ FstSlice fstNodeAsSlice(FstNode *node) {
FstLastTransition
*
fstLastTransitionCreate
(
uint8_t
inp
,
Output
out
)
{
FstLastTransition
*
trn
=
malloc
(
sizeof
(
FstLastTransition
));
if
(
trn
==
NULL
)
{
return
NULL
;
}
if
(
trn
==
NULL
)
{
return
NULL
;
}
trn
->
inp
=
inp
;
trn
->
out
=
out
;
...
...
@@ -936,9 +896,7 @@ FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out) {
void
fstLastTransitionDestroy
(
FstLastTransition
*
trn
)
{
free
(
trn
);
}
void
fstBuilderNodeUnfinishedLastCompiled
(
FstBuilderNodeUnfinished
*
unNode
,
CompiledAddr
addr
)
{
FstLastTransition
*
trn
=
unNode
->
last
;
if
(
trn
==
NULL
)
{
return
;
}
if
(
trn
==
NULL
)
{
return
;
}
FstTransition
t
=
{.
inp
=
trn
->
inp
,
.
out
=
trn
->
out
,
.
addr
=
addr
};
taosArrayPush
(
unNode
->
node
->
trans
,
&
t
);
fstLastTransitionDestroy
(
trn
);
...
...
@@ -947,35 +905,27 @@ void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished *unNode, Comp
}
void
fstBuilderNodeUnfinishedAddOutputPrefix
(
FstBuilderNodeUnfinished
*
unNode
,
Output
out
)
{
if
(
FST_BUILDER_NODE_IS_FINAL
(
unNode
->
node
))
{
unNode
->
node
->
finalOutput
+=
out
;
}
if
(
FST_BUILDER_NODE_IS_FINAL
(
unNode
->
node
))
{
unNode
->
node
->
finalOutput
+=
out
;
}
size_t
sz
=
taosArrayGetSize
(
unNode
->
node
->
trans
);
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
FstTransition
*
trn
=
taosArrayGet
(
unNode
->
node
->
trans
,
i
);
trn
->
out
+=
out
;
}
if
(
unNode
->
last
)
{
unNode
->
last
->
out
+=
out
;
}
if
(
unNode
->
last
)
{
unNode
->
last
->
out
+=
out
;
}
return
;
}
Fst
*
fstCreate
(
FstSlice
*
slice
)
{
int32_t
slen
;
char
*
buf
=
fstSliceData
(
slice
,
&
slen
);
if
(
slen
<
36
)
{
return
NULL
;
}
if
(
slen
<
36
)
{
return
NULL
;
}
uint64_t
len
=
slen
;
uint64_t
skip
=
0
;
uint64_t
version
;
taosDecodeFixedU64
(
buf
,
&
version
);
skip
+=
sizeof
(
version
);
if
(
version
==
0
||
version
>
VERSION
)
{
return
NULL
;
}
if
(
version
==
0
||
version
>
VERSION
)
{
return
NULL
;
}
uint64_t
type
;
taosDecodeFixedU64
(
buf
+
skip
,
&
type
);
...
...
@@ -994,14 +944,10 @@ Fst *fstCreate(FstSlice *slice) {
taosDecodeFixedU64
(
buf
+
len
,
&
fstLen
);
// TODO(validate root addr)
Fst
*
fst
=
(
Fst
*
)
calloc
(
1
,
sizeof
(
Fst
));
if
(
fst
==
NULL
)
{
return
NULL
;
}
if
(
fst
==
NULL
)
{
return
NULL
;
}
fst
->
meta
=
(
FstMeta
*
)
malloc
(
sizeof
(
FstMeta
));
if
(
NULL
==
fst
->
meta
)
{
goto
FST_CREAT_FAILED
;
}
if
(
NULL
==
fst
->
meta
)
{
goto
FST_CREAT_FAILED
;
}
fst
->
meta
->
version
=
version
;
fst
->
meta
->
rootAddr
=
rootAddr
;
...
...
@@ -1039,9 +985,7 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
for
(
uint32_t
i
=
0
;
i
<
len
;
i
++
)
{
uint8_t
inp
=
data
[
i
];
Output
res
=
0
;
if
(
false
==
fstNodeFindInput
(
root
,
inp
,
&
res
))
{
return
false
;
}
if
(
false
==
fstNodeFindInput
(
root
,
inp
,
&
res
))
{
return
false
;
}
FstTransition
trn
;
fstNodeGetTransitionAt
(
root
,
res
,
&
trn
);
...
...
@@ -1068,17 +1012,13 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
}
FstStreamBuilder
*
fstSearch
(
Fst
*
fst
,
AutomationCtx
*
ctx
)
{
return
fstStreamBuilderCreate
(
fst
,
ctx
);
}
StreamWithState
*
streamBuilderIntoStream
(
FstStreamBuilder
*
sb
)
{
if
(
sb
==
NULL
)
{
return
NULL
;
}
if
(
sb
==
NULL
)
{
return
NULL
;
}
return
streamWithStateCreate
(
sb
->
fst
,
sb
->
aut
,
sb
->
min
,
sb
->
max
);
}
FstStreamWithStateBuilder
*
fstSearchWithState
(
Fst
*
fst
,
AutomationCtx
*
ctx
)
{
return
fstStreamBuilderCreate
(
fst
,
ctx
);
}
FstNode
*
fstGetRoot
(
Fst
*
fst
)
{
if
(
fst
->
root
!=
NULL
)
{
return
fst
->
root
;
}
if
(
fst
->
root
!=
NULL
)
{
return
fst
->
root
;
}
CompiledAddr
rAddr
=
fstGetRootAddr
(
fst
);
fst
->
root
=
fstGetNode
(
fst
,
rAddr
);
return
fst
->
root
;
...
...
@@ -1104,18 +1044,14 @@ bool fstVerify(Fst *fst) {
int32_t
len
;
uint8_t
*
data
=
fstSliceData
(
fst
->
data
,
&
len
);
TSCKSUM
initSum
=
0
;
if
(
!
taosCheckChecksumWhole
(
data
,
len
))
{
return
false
;
}
if
(
!
taosCheckChecksumWhole
(
data
,
len
))
{
return
false
;
}
return
true
;
}
// data bound function
FstBoundWithData
*
fstBoundStateCreate
(
FstBound
type
,
FstSlice
*
data
)
{
FstBoundWithData
*
b
=
calloc
(
1
,
sizeof
(
FstBoundWithData
));
if
(
b
==
NULL
)
{
return
NULL
;
}
if
(
b
==
NULL
)
{
return
NULL
;
}
if
(
data
!=
NULL
)
{
b
->
data
=
fstSliceCopy
(
data
,
data
->
start
,
data
->
end
);
...
...
@@ -1152,9 +1088,7 @@ void fstBoundDestroy(FstBoundWithData *bound) { free(bound); }
StreamWithState
*
streamWithStateCreate
(
Fst
*
fst
,
AutomationCtx
*
automation
,
FstBoundWithData
*
min
,
FstBoundWithData
*
max
)
{
StreamWithState
*
sws
=
calloc
(
1
,
sizeof
(
StreamWithState
));
if
(
sws
==
NULL
)
{
return
NULL
;
}
if
(
sws
==
NULL
)
{
return
NULL
;
}
sws
->
fst
=
fst
;
sws
->
aut
=
automation
;
...
...
@@ -1170,9 +1104,7 @@ StreamWithState *streamWithStateCreate(
return
sws
;
}
void
streamWithStateDestroy
(
StreamWithState
*
sws
)
{
if
(
sws
==
NULL
)
{
return
;
}
if
(
sws
==
NULL
)
{
return
;
}
taosArrayDestroy
(
sws
->
inp
);
taosArrayDestroyEx
(
sws
->
stack
,
streamStateDestroy
);
...
...
@@ -1195,7 +1127,6 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
}
FstSlice
*
key
=
NULL
;
bool
inclusize
=
false
;
;
if
(
min
->
type
==
Included
)
{
key
=
&
min
->
data
;
...
...
@@ -1239,9 +1170,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
uint64_t
i
=
0
;
for
(
i
=
trans
->
range
.
start
;
i
<
trans
->
range
.
end
;
i
++
)
{
FstTransition
trn
;
if
(
fstNodeGetTransitionAt
(
node
,
i
,
&
trn
)
&&
trn
.
inp
>
b
)
{
break
;
}
if
(
fstNodeGetTransitionAt
(
node
,
i
,
&
trn
)
&&
trn
.
inp
>
b
)
{
break
;
}
}
StreamState
s
=
{.
node
=
node
,
.
trans
=
i
,
.
out
=
{.
null
=
false
,
.
out
=
out
},
.
autState
=
autState
};
...
...
@@ -1289,9 +1218,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
while
(
taosArrayGetSize
(
sws
->
stack
)
>
0
)
{
StreamState
*
p
=
(
StreamState
*
)
taosArrayPop
(
sws
->
stack
);
if
(
p
->
trans
>=
FST_NODE_LEN
(
p
->
node
)
||
!
automFuncs
[
aut
->
type
].
canMatch
(
aut
,
p
->
autState
))
{
if
(
FST_NODE_ADDR
(
p
->
node
)
!=
fstGetRootAddr
(
sws
->
fst
))
{
taosArrayPop
(
sws
->
inp
);
}
if
(
FST_NODE_ADDR
(
p
->
node
)
!=
fstGetRootAddr
(
sws
->
fst
))
{
taosArrayPop
(
sws
->
inp
);
}
streamStateDestroy
(
p
);
continue
;
}
...
...
@@ -1308,9 +1235,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
if
(
FST_NODE_IS_FINAL
(
nextNode
))
{
// void *eofState = sws->aut->acceptEof(nextState);
void
*
eofState
=
automFuncs
[
aut
->
type
].
acceptEof
(
aut
,
nextState
);
if
(
eofState
!=
NULL
)
{
isMatch
=
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
eofState
);
}
if
(
eofState
!=
NULL
)
{
isMatch
=
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
eofState
);
}
}
StreamState
s1
=
{.
node
=
p
->
node
,
.
trans
=
p
->
trans
+
1
,
.
out
=
p
->
out
,
.
autState
=
p
->
autState
};
taosArrayPush
(
sws
->
stack
,
&
s1
);
...
...
@@ -1320,9 +1245,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
size_t
isz
=
taosArrayGetSize
(
sws
->
inp
);
uint8_t
*
buf
=
(
uint8_t
*
)
malloc
(
isz
*
sizeof
(
uint8_t
));
for
(
uint32_t
i
=
0
;
i
<
isz
;
i
++
)
{
buf
[
i
]
=
*
(
uint8_t
*
)
taosArrayGet
(
sws
->
inp
,
i
);
}
for
(
uint32_t
i
=
0
;
i
<
isz
;
i
++
)
{
buf
[
i
]
=
*
(
uint8_t
*
)
taosArrayGet
(
sws
->
inp
,
i
);
}
FstSlice
slice
=
fstSliceCreate
(
buf
,
taosArrayGetSize
(
sws
->
inp
));
if
(
fstBoundWithDataExceededBy
(
sws
->
endAt
,
&
slice
))
{
taosArrayDestroyEx
(
sws
->
stack
,
streamStateDestroy
);
...
...
@@ -1351,9 +1274,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
StreamWithStateResult
*
swsResultCreate
(
FstSlice
*
data
,
FstOutput
fOut
,
void
*
state
)
{
StreamWithStateResult
*
result
=
calloc
(
1
,
sizeof
(
StreamWithStateResult
));
if
(
result
==
NULL
)
{
return
NULL
;
}
if
(
result
==
NULL
)
{
return
NULL
;
}
result
->
data
=
fstSliceCopy
(
data
,
0
,
FST_SLICE_LEN
(
data
)
-
1
);
result
->
out
=
fOut
;
...
...
@@ -1362,9 +1283,7 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta
return
result
;
}
void
swsResultDestroy
(
StreamWithStateResult
*
result
)
{
if
(
NULL
==
result
)
{
return
;
}
if
(
NULL
==
result
)
{
return
;
}
fstSliceDestroy
(
&
result
->
data
);
startWithStateValueDestroy
(
result
->
state
);
...
...
@@ -1372,9 +1291,7 @@ void swsResultDestroy(StreamWithStateResult *result) {
}
void
streamStateDestroy
(
void
*
s
)
{
if
(
NULL
==
s
)
{
return
;
}
if
(
NULL
==
s
)
{
return
;
}
StreamState
*
ss
=
(
StreamState
*
)
s
;
fstNodeDestroy
(
ss
->
node
);
...
...
@@ -1383,9 +1300,7 @@ void streamStateDestroy(void *s) {
FstStreamBuilder
*
fstStreamBuilderCreate
(
Fst
*
fst
,
AutomationCtx
*
aut
)
{
FstStreamBuilder
*
b
=
calloc
(
1
,
sizeof
(
FstStreamBuilder
));
if
(
NULL
==
b
)
{
return
NULL
;
}
if
(
NULL
==
b
)
{
return
NULL
;
}
b
->
fst
=
fst
;
b
->
aut
=
aut
;
...
...
@@ -1401,9 +1316,7 @@ void fstStreamBuilderDestroy(FstStreamBuilder *b) {
free
(
b
);
}
FstStreamBuilder
*
fstStreamBuilderRange
(
FstStreamBuilder
*
b
,
FstSlice
*
val
,
RangeType
type
)
{
if
(
b
==
NULL
)
{
return
NULL
;
}
if
(
b
==
NULL
)
{
return
NULL
;
}
if
(
type
==
GE
)
{
b
->
min
->
type
=
Included
;
...
...
source/libs/index/src/index_fst_counting_writer.c
浏览文件 @
192552be
...
...
@@ -18,9 +18,7 @@
#include "tutil.h"
static
int
writeCtxDoWrite
(
WriterCtx
*
ctx
,
uint8_t
*
buf
,
int
len
)
{
if
(
ctx
->
offset
+
len
>
ctx
->
limit
)
{
return
-
1
;
}
if
(
ctx
->
offset
+
len
>
ctx
->
limit
)
{
return
-
1
;
}
if
(
ctx
->
type
==
TFile
)
{
assert
(
len
==
tfWrite
(
ctx
->
file
.
fd
,
buf
,
len
));
...
...
@@ -53,9 +51,7 @@ static int writeCtxDoFlush(WriterCtx *ctx) {
WriterCtx
*
writerCtxCreate
(
WriterType
type
,
const
char
*
path
,
bool
readOnly
,
int32_t
capacity
)
{
WriterCtx
*
ctx
=
calloc
(
1
,
sizeof
(
WriterCtx
));
if
(
ctx
==
NULL
)
{
return
NULL
;
}
if
(
ctx
==
NULL
)
{
return
NULL
;
}
ctx
->
type
=
type
;
if
(
ctx
->
type
==
TFile
)
{
...
...
@@ -67,8 +63,8 @@ WriterCtx *writerCtxCreate(WriterType type, const char *path, bool readOnly, int
ctx
->
file
.
fd
=
tfOpenReadWrite
(
tmpFile
);
}
if
(
ctx
->
file
.
fd
<
0
)
{
goto
END
;
indexError
(
"open file error %d"
,
errno
);
goto
END
;
}
}
else
if
(
ctx
->
type
==
TMemory
)
{
ctx
->
mem
.
buf
=
calloc
(
1
,
sizeof
(
char
)
*
capacity
);
...
...
@@ -83,9 +79,7 @@ WriterCtx *writerCtxCreate(WriterType type, const char *path, bool readOnly, int
return
ctx
;
END:
if
(
ctx
->
type
==
TMemory
)
{
free
(
ctx
->
mem
.
buf
);
}
if
(
ctx
->
type
==
TMemory
)
{
free
(
ctx
->
mem
.
buf
);
}
free
(
ctx
);
}
void
writerCtxDestroy
(
WriterCtx
*
ctx
)
{
...
...
@@ -99,9 +93,7 @@ void writerCtxDestroy(WriterCtx *ctx) {
FstCountingWriter
*
fstCountingWriterCreate
(
void
*
wrt
)
{
FstCountingWriter
*
cw
=
calloc
(
1
,
sizeof
(
FstCountingWriter
));
if
(
cw
==
NULL
)
{
return
NULL
;
}
if
(
cw
==
NULL
)
{
return
NULL
;
}
cw
->
wrt
=
wrt
;
//(void *)(writerCtxCreate(TFile, readOnly));
...
...
@@ -115,9 +107,7 @@ void fstCountingWriterDestroy(FstCountingWriter *cw) {
}
int
fstCountingWriterWrite
(
FstCountingWriter
*
write
,
uint8_t
*
buf
,
uint32_t
len
)
{
if
(
write
==
NULL
)
{
return
0
;
}
if
(
write
==
NULL
)
{
return
0
;
}
// update checksum
// write data to file/socket or mem
WriterCtx
*
ctx
=
write
->
wrt
;
...
...
@@ -128,9 +118,7 @@ int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len)
return
len
;
}
int
fstCountingWriterRead
(
FstCountingWriter
*
write
,
uint8_t
*
buf
,
uint32_t
len
)
{
if
(
write
==
NULL
)
{
return
0
;
}
if
(
write
==
NULL
)
{
return
0
;
}
WriterCtx
*
ctx
=
write
->
wrt
;
int
nRead
=
ctx
->
read
(
ctx
,
buf
,
len
);
// assert(nRead == len);
...
...
source/libs/index/src/index_fst_registry.c
浏览文件 @
192552be
...
...
@@ -34,9 +34,7 @@ uint64_t fstRegistryHash(FstRegistry *registry, FstBuilderNode *bNode) {
}
static
void
fstRegistryCellSwap
(
SArray
*
arr
,
uint32_t
a
,
uint32_t
b
)
{
size_t
sz
=
taosArrayGetSize
(
arr
);
if
(
a
>=
sz
||
b
>=
sz
)
{
return
;
}
if
(
a
>=
sz
||
b
>=
sz
)
{
return
;
}
FstRegistryCell
*
cell1
=
(
FstRegistryCell
*
)
taosArrayGet
(
arr
,
a
);
FstRegistryCell
*
cell2
=
(
FstRegistryCell
*
)
taosArrayGet
(
arr
,
b
);
...
...
@@ -53,9 +51,7 @@ static void fstRegistryCellSwap(SArray *arr, uint32_t a, uint32_t b) {
static
void
fstRegistryCellPromote
(
SArray
*
arr
,
uint32_t
start
,
uint32_t
end
)
{
size_t
sz
=
taosArrayGetSize
(
arr
);
if
(
start
>=
sz
&&
end
>=
sz
)
{
return
;
}
if
(
start
>=
sz
&&
end
>=
sz
)
{
return
;
}
assert
(
start
>=
end
);
...
...
@@ -69,9 +65,7 @@ static void fstRegistryCellPromote(SArray *arr, uint32_t start, uint32_t end) {
FstRegistry
*
fstRegistryCreate
(
uint64_t
tableSize
,
uint64_t
mruSize
)
{
FstRegistry
*
registry
=
malloc
(
sizeof
(
FstRegistry
));
if
(
registry
==
NULL
)
{
return
NULL
;
}
if
(
registry
==
NULL
)
{
return
NULL
;
}
uint64_t
nCells
=
tableSize
*
mruSize
;
SArray
*
tb
=
(
SArray
*
)
taosArrayInit
(
nCells
,
sizeof
(
FstRegistryCell
));
...
...
@@ -92,9 +86,7 @@ FstRegistry *fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) {
}
void
fstRegistryDestroy
(
FstRegistry
*
registry
)
{
if
(
registry
==
NULL
)
{
return
;
}
if
(
registry
==
NULL
)
{
return
;
}
SArray
*
tb
=
registry
->
table
;
size_t
sz
=
taosArrayGetSize
(
tb
);
...
...
@@ -107,9 +99,7 @@ void fstRegistryDestroy(FstRegistry *registry) {
}
FstRegistryEntry
*
fstRegistryGetEntry
(
FstRegistry
*
registry
,
FstBuilderNode
*
bNode
)
{
if
(
taosArrayGetSize
(
registry
->
table
)
<=
0
)
{
return
NULL
;
}
if
(
taosArrayGetSize
(
registry
->
table
)
<=
0
)
{
return
NULL
;
}
uint64_t
bucket
=
fstRegistryHash
(
registry
,
bNode
);
uint64_t
start
=
registry
->
mruSize
*
bucket
;
uint64_t
end
=
start
+
registry
->
mruSize
;
...
...
source/libs/index/src/index_fst_util.c
浏览文件 @
192552be
...
...
@@ -64,6 +64,7 @@ uint8_t packSize(uint64_t n) {
uint64_t
unpackUint64
(
uint8_t
*
ch
,
uint8_t
sz
)
{
uint64_t
n
=
0
;
for
(
uint8_t
i
=
0
;
i
<
sz
;
i
++
)
{
//
n
=
n
|
(
ch
[
i
]
<<
(
8
*
i
));
}
return
n
;
...
...
@@ -133,9 +134,7 @@ bool fstSliceIsEmpty(FstSlice *s) { return s->str == NULL || s->str->len == 0 ||
uint8_t
*
fstSliceData
(
FstSlice
*
s
,
int32_t
*
size
)
{
FstString
*
str
=
s
->
str
;
if
(
size
!=
NULL
)
{
*
size
=
s
->
end
-
s
->
start
+
1
;
}
if
(
size
!=
NULL
)
{
*
size
=
s
->
end
-
s
->
start
+
1
;
}
return
str
->
data
+
s
->
start
;
}
void
fstSliceDestroy
(
FstSlice
*
s
)
{
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
192552be
...
...
@@ -22,14 +22,16 @@
#include "index_util.h"
#include "taosdef.h"
static
FORCE_INLINE
int
tfileWriteHeader
(
TFileWriter
*
writer
)
{}
static
FORCE_INLINE
int
tfileReadLoadHeader
(
TFileReader
*
reader
)
{
// TODO simple tfile header later
char
buf
[
TFILE_HADER_PRE_SIZE
];
char
*
p
=
buf
;
TFileReadHeader
*
header
=
&
reader
->
header
;
int64_t
nread
=
reader
->
ctx
->
read
(
reader
->
ctx
,
buf
,
TFILE_HADER_PRE_SIZE
);
char
buf
[
TFILE_HADER_PRE_SIZE
];
char
*
p
=
buf
;
int64_t
nread
=
reader
->
ctx
->
read
(
reader
->
ctx
,
buf
,
TFILE_HADER_PRE_SIZE
);
assert
(
nread
==
TFILE_HADER_PRE_SIZE
);
TFileHeader
*
header
=
&
reader
->
header
;
memcpy
(
&
header
->
suid
,
p
,
sizeof
(
header
->
suid
));
p
+=
sizeof
(
header
->
suid
);
...
...
@@ -47,9 +49,7 @@ static FORCE_INLINE int tfileReadLoadHeader(TFileReader *reader) {
};
static
int
tfileGetFileList
(
const
char
*
path
,
SArray
*
result
)
{
DIR
*
dir
=
opendir
(
path
);
if
(
NULL
==
dir
)
{
return
-
1
;
}
if
(
NULL
==
dir
)
{
return
-
1
;
}
struct
dirent
*
entry
;
while
((
entry
=
readdir
(
dir
))
!=
NULL
)
{
...
...
@@ -68,8 +68,10 @@ static void tfileDestroyFileName(void *elem) {
static
int
tfileCompare
(
const
void
*
a
,
const
void
*
b
)
{
const
char
*
aName
=
*
(
char
**
)
a
;
const
char
*
bName
=
*
(
char
**
)
b
;
size_t
aLen
=
strlen
(
aName
);
size_t
bLen
=
strlen
(
bName
);
size_t
aLen
=
strlen
(
aName
);
size_t
bLen
=
strlen
(
bName
);
return
strncmp
(
aName
,
bName
,
aLen
>
bLen
?
aLen
:
bLen
);
}
// tfile name suid-colId-version.tindex
...
...
@@ -92,9 +94,7 @@ static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) {
TFileCache
*
tfileCacheCreate
(
const
char
*
path
)
{
TFileCache
*
tcache
=
calloc
(
1
,
sizeof
(
TFileCache
));
if
(
tcache
==
NULL
)
{
return
NULL
;
}
if
(
tcache
==
NULL
)
{
return
NULL
;
}
tcache
->
tableCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
tcache
->
capacity
=
64
;
...
...
@@ -102,15 +102,14 @@ TFileCache *tfileCacheCreate(const char *path) {
SArray
*
files
=
taosArrayInit
(
4
,
sizeof
(
void
*
));
tfileGetFileList
(
path
,
files
);
taosArraySort
(
files
,
tfileCompare
);
uint64_t
suid
;
int32_t
colId
,
version
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
files
);
i
++
)
{
char
*
file
=
taosArrayGetP
(
files
,
i
);
uint64_t
suid
;
int
colId
,
version
;
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
&
colId
,
&
version
))
{
goto
End
;
char
*
file
=
taosArrayGetP
(
files
,
i
);
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
(
int
*
)
&
colId
,
(
int
*
)
&
version
))
{
indexInfo
(
"try parse invalid file: %s, skip it"
,
file
);
continue
;
}
WriterCtx
*
wc
=
writerCtxCreate
(
TFile
,
file
,
true
,
1024
*
64
);
if
(
wc
==
NULL
)
{
indexError
(
"failed to open index: %s"
,
file
);
...
...
@@ -122,6 +121,15 @@ TFileCache *tfileCacheCreate(const char *path) {
indexError
(
"failed to load index header, index Id: %s"
,
file
);
goto
End
;
}
// loader fst and validate it
TFileHeader
*
header
=
&
reader
->
header
;
TFileCacheKey
key
=
{
.
suid
=
header
->
suid
,
.
version
=
header
->
version
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
),
.
colType
=
header
->
colType
};
char
buf
[
128
]
=
{
0
};
tfileSerialCacheKey
(
&
key
,
buf
);
taosHashPut
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
),
&
reader
,
sizeof
(
void
*
));
}
taosArrayDestroyEx
(
files
,
tfileDestroyFileName
);
return
tcache
;
...
...
@@ -131,16 +139,14 @@ End:
return
NULL
;
}
void
tfileCacheDestroy
(
TFileCache
*
tcache
)
{
if
(
tcache
==
NULL
)
{
return
;
}
if
(
tcache
==
NULL
)
{
return
;
}
// free table cache
TFileReader
**
reader
=
taosHashIterate
(
tcache
->
tableCache
,
NULL
);
while
(
reader
)
{
TFileReader
*
p
=
*
reader
;
indexInfo
(
"drop table cache suid: %"
PRIu64
", colName: %s, colType: %d"
,
p
->
header
.
suid
,
p
->
header
.
colName
,
p
->
header
.
colType
);
indexInfo
(
"drop table cache suid: %"
PRIu64
", colName: %s, colType: %d"
,
p
->
header
.
suid
,
p
->
header
.
colName
,
p
->
header
.
colType
);
TFileReaderDestroy
(
p
);
reader
=
taosHashIterate
(
tcache
->
tableCache
,
reader
);
}
...
...
@@ -163,45 +169,67 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader)
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
)
{
TFileReader
*
reader
=
calloc
(
1
,
sizeof
(
TFileReader
));
if
(
reader
==
NULL
)
{
return
NULL
;
}
reader
->
ctx
=
ctx
;
if
(
reader
==
NULL
)
{
return
NULL
;
}
// T_REF_INC(reader);
reader
->
ctx
=
ctx
;
return
reader
;
}
void
TFileReaderDestroy
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
;
}
if
(
reader
==
NULL
)
{
return
;
}
// T_REF_INC(reader);
writerCtxDestroy
(
reader
->
ctx
);
free
(
reader
);
}
TFileWriter
*
tfileWriterCreate
(
const
char
*
suid
,
const
char
*
colName
);
void
tfileWriterDestroy
(
TFileWriter
*
tw
);
TFileWriter
*
tfileWriterCreate
(
WriterCtx
*
ctx
,
TFileHeader
*
header
)
{
// char pathBuf[128] = {0};
// sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
// TFileHeader header = {.suid = suid, .version = version, .colName = {0}, colType = colType};
// memcpy(header.colName, );
char
buf
[
TFILE_HADER_PRE_SIZE
];
int
len
=
TFILE_HADER_PRE_SIZE
;
if
(
len
!=
ctx
->
write
(
ctx
,
buf
,
len
))
{
indexError
(
"index: %"
PRIu64
" failed to write header info"
,
header
->
suid
);
return
NULL
;
}
TFileWriter
*
tw
=
calloc
(
1
,
sizeof
(
TFileWriter
));
if
(
tw
==
NULL
)
{
indexError
(
"index: % "
PRIu64
" failed to write header info"
);
return
NULL
;
}
return
tw
;
}
void
tfileWriterDestroy
(
TFileWriter
*
tw
)
{
if
(
tw
==
NULL
)
{
return
;
}
writerCtxDestroy
(
tw
->
ctx
);
free
(
tw
);
}
IndexTFile
*
indexTFileCreate
(
const
char
*
path
)
{
IndexTFile
*
tfile
=
calloc
(
1
,
sizeof
(
IndexTFile
));
tfile
->
cache
=
tfileCacheCreate
(
path
);
if
(
tfile
==
NULL
)
{
return
NULL
;
}
tfile
->
cache
=
tfileCacheCreate
(
path
);
return
tfile
;
}
void
IndexTFileDestroy
(
IndexTFile
*
tfile
)
{
free
(
tfile
);
}
int
indexTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
IndexTFile
*
pTfile
=
(
IndexTFile
*
)
tfile
;
if
(
pTfile
==
NULL
)
{
return
-
1
;
}
SIndexTerm
*
term
=
query
->
term
;
TFileCacheKey
key
=
{
.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
version
=
0
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
TFileCacheKey
key
=
{
.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
version
=
0
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
TFileReader
*
reader
=
tfileCacheGet
(
pTfile
->
cache
,
&
key
);
return
0
;
}
int
indexTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
TFileWriterOpt
wOpt
=
{
.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
,
.
version
=
1
};
TFileWriterOpt
wOpt
=
{.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
,
.
version
=
1
};
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录