Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
11eb2719
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
11eb2719
编写于
1月 05, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor code
上级
55676139
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
193 addition
and
124 deletion
+193
-124
source/libs/index/src/index.c
source/libs/index/src/index.c
+43
-20
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+48
-25
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+102
-79
未找到文件。
source/libs/index/src/index.c
浏览文件 @
11eb2719
...
...
@@ -63,7 +63,9 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
// pthread_once(&isInit, indexInit);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
#ifdef USE_LUCENE
index_t
*
index
=
index_open
(
path
);
...
...
@@ -73,7 +75,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#ifdef USE_INVERTED_INDEX
// sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx
->
tindex
=
indexTFileCreate
(
path
);
if
(
sIdx
->
tindex
==
NULL
)
{
goto
END
;
}
if
(
sIdx
->
tindex
==
NULL
)
{
goto
END
;
}
sIdx
->
colObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
cVersion
=
1
;
...
...
@@ -84,7 +88,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#endif
END:
if
(
sIdx
!=
NULL
)
{
indexClose
(
sIdx
);
}
if
(
sIdx
!=
NULL
)
{
indexClose
(
sIdx
);
}
*
index
=
NULL
;
return
-
1
;
...
...
@@ -100,7 +106,9 @@ void indexClose(SIndex* sIdx) {
void
*
iter
=
taosHashIterate
(
sIdx
->
colObj
,
NULL
);
while
(
iter
)
{
IndexCache
**
pCache
=
iter
;
if
(
*
pCache
)
{
indexCacheUnRef
(
*
pCache
);
}
if
(
*
pCache
)
{
indexCacheUnRef
(
*
pCache
);
}
iter
=
taosHashIterate
(
sIdx
->
colObj
,
iter
);
}
taosHashCleanup
(
sIdx
->
colObj
);
...
...
@@ -158,7 +166,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
assert
(
*
cache
!=
NULL
);
int
ret
=
indexCachePut
(
*
cache
,
p
,
uid
);
if
(
ret
!=
0
)
{
return
ret
;
}
if
(
ret
!=
0
)
{
return
ret
;
}
}
#endif
...
...
@@ -188,7 +198,9 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
int
tsz
=
0
;
index_multi_search
(
index
->
index
,
(
const
char
**
)
fields
,
(
const
char
**
)
keys
,
types
,
nQuery
,
opera
,
&
tResult
,
&
tsz
);
for
(
int
i
=
0
;
i
<
tsz
;
i
++
)
{
taosArrayPush
(
result
,
&
tResult
[
i
]);
}
for
(
int
i
=
0
;
i
<
tsz
;
i
++
)
{
taosArrayPush
(
result
,
&
tResult
[
i
]);
}
for
(
int
i
=
0
;
i
<
nQuery
;
i
++
)
{
free
(
fields
[
i
]);
...
...
@@ -245,7 +257,9 @@ void indexOptsDestroy(SIndexOpts* opts) {
*/
SIndexMultiTermQuery
*
indexMultiTermQueryCreate
(
EIndexOperatorType
opera
)
{
SIndexMultiTermQuery
*
p
=
(
SIndexMultiTermQuery
*
)
malloc
(
sizeof
(
SIndexMultiTermQuery
));
if
(
p
==
NULL
)
{
return
NULL
;
}
if
(
p
==
NULL
)
{
return
NULL
;
}
p
->
opera
=
opera
;
p
->
query
=
taosArrayInit
(
4
,
sizeof
(
SIndexTermQuery
));
return
p
;
...
...
@@ -267,7 +281,9 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
SIndexTerm
*
indexTermCreate
(
int64_t
suid
,
SIndexOperOnColumn
oper
,
uint8_t
colType
,
const
char
*
colName
,
int32_t
nColName
,
const
char
*
colVal
,
int32_t
nColVal
)
{
SIndexTerm
*
t
=
(
SIndexTerm
*
)
calloc
(
1
,
(
sizeof
(
SIndexTerm
)));
if
(
t
==
NULL
)
{
return
NULL
;
}
if
(
t
==
NULL
)
{
return
NULL
;
}
t
->
suid
=
suid
;
t
->
operType
=
oper
;
...
...
@@ -340,7 +356,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
return
0
;
}
static
void
indexInterResultsDestroy
(
SArray
*
results
)
{
if
(
results
==
NULL
)
{
return
;
}
if
(
results
==
NULL
)
{
return
;
}
size_t
sz
=
taosArrayGetSize
(
results
);
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -396,16 +414,22 @@ static void indexDestroyTempResult(SArray* result) {
taosArrayDestroy
(
result
);
}
int
indexFlushCacheTFile
(
SIndex
*
sIdx
,
void
*
cache
)
{
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
indexInfo
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
IndexCache
*
pCache
=
(
IndexCache
*
)
cache
;
TFileReader
*
pReader
=
tfileGetReaderByCol
(
sIdx
->
tindex
,
pCache
->
suid
,
pCache
->
colName
);
if
(
pReader
==
NULL
)
{
indexWarn
(
"empty tfile reader found"
);
}
if
(
pReader
==
NULL
)
{
indexWarn
(
"empty tfile reader found"
);
}
// handle flush
Iterate
*
cacheIter
=
indexCacheIteratorCreate
(
pCache
);
Iterate
*
tfileIter
=
tfileIteratorCreate
(
pReader
);
if
(
tfileIter
==
NULL
)
{
indexWarn
(
"empty tfile reader iterator"
);
}
if
(
tfileIter
==
NULL
)
{
indexWarn
(
"empty tfile reader iterator"
);
}
SArray
*
result
=
taosArrayInit
(
1024
,
sizeof
(
void
*
));
...
...
@@ -452,10 +476,6 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
while
(
tn
==
true
)
{
IterateValue
*
tv
=
tfileIter
->
getValue
(
tfileIter
);
TFileValue
*
tfv
=
tfileValueCreate
(
tv
->
colVal
);
if
(
tv
->
val
==
NULL
)
{
// HO
printf
(
"NO...."
);
}
taosArrayAddAll
(
tfv
->
tableId
,
tv
->
val
);
indexMergeSameKey
(
result
,
tfv
);
tn
=
tfileIter
->
next
(
tfileIter
);
...
...
@@ -476,7 +496,9 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
taosArrayDestroy
(
value
->
val
);
value
->
val
=
NULL
;
}
else
{
if
(
value
->
val
!=
NULL
)
{
taosArrayClear
(
value
->
val
);
}
if
(
value
->
val
!=
NULL
)
{
taosArrayClear
(
value
->
val
);
}
}
free
(
value
->
colVal
);
value
->
colVal
=
NULL
;
...
...
@@ -499,11 +521,12 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
tfileWriterClose
(
tw
);
TFileReader
*
reader
=
tfileReaderOpen
(
sIdx
->
path
,
cache
->
suid
,
version
,
cache
->
colName
);
if
(
reader
==
NULL
)
{
goto
END
;
}
if
(
reader
==
NULL
)
{
goto
END
;
}
TFileHeader
*
header
=
&
reader
->
header
;
ICacheKey
key
=
{
.
suid
=
cache
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
),
.
colType
=
header
->
colType
};
ICacheKey
key
=
{.
suid
=
cache
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
)};
pthread_mutex_lock
(
&
sIdx
->
mtx
);
IndexTFile
*
ifile
=
(
IndexTFile
*
)
sIdx
->
tindex
;
...
...
source/libs/index/src/index_cache.c
浏览文件 @
11eb2719
...
...
@@ -21,10 +21,6 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
// sizeof(p->operType))
static
void
indexMemRef
(
MemTable
*
tbl
);
static
void
indexMemUnRef
(
MemTable
*
tbl
);
...
...
@@ -119,7 +115,9 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
tSkipListDestroy
(
slt
);
}
void
indexCacheDestroyImm
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
if
(
cache
==
NULL
)
{
return
;
}
MemTable
*
tbl
=
NULL
;
pthread_mutex_lock
(
&
cache
->
mtx
);
...
...
@@ -132,7 +130,9 @@ void indexCacheDestroyImm(IndexCache* cache) {
}
void
indexCacheDestroy
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
if
(
pCache
==
NULL
)
{
return
;
}
if
(
pCache
==
NULL
)
{
return
;
}
indexMemUnRef
(
pCache
->
mem
);
indexMemUnRef
(
pCache
->
imm
);
free
(
pCache
->
colName
);
...
...
@@ -142,7 +142,9 @@ void indexCacheDestroy(void* cache) {
Iterate
*
indexCacheIteratorCreate
(
IndexCache
*
cache
)
{
Iterate
*
iiter
=
calloc
(
1
,
sizeof
(
Iterate
));
if
(
iiter
==
NULL
)
{
return
NULL
;
}
if
(
iiter
==
NULL
)
{
return
NULL
;
}
pthread_mutex_lock
(
&
cache
->
mtx
);
...
...
@@ -160,7 +162,9 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
return
iiter
;
}
void
indexCacheIteratorDestroy
(
Iterate
*
iter
)
{
if
(
iter
==
NULL
)
{
return
;
}
if
(
iter
==
NULL
)
{
return
;
}
tSkipListDestroyIter
(
iter
->
iter
);
iterateValueDestroy
(
&
iter
->
val
,
true
);
free
(
iter
);
...
...
@@ -198,13 +202,17 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
if
(
cache
==
NULL
)
{
return
-
1
;
}
IndexCache
*
pCache
=
cache
;
indexCacheRef
(
pCache
);
// encode data
CacheTerm
*
ct
=
calloc
(
1
,
sizeof
(
CacheTerm
));
if
(
cache
==
NULL
)
{
return
-
1
;
}
if
(
cache
==
NULL
)
{
return
-
1
;
}
// set up key
ct
->
colType
=
term
->
colType
;
ct
->
colVal
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
(
term
->
nColVal
+
1
));
...
...
@@ -235,7 +243,9 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
}
static
int
indexQueryMem
(
MemTable
*
mem
,
CacheTerm
*
ct
,
EIndexQueryType
qtype
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
mem
==
NULL
)
{
return
0
;
}
if
(
mem
==
NULL
)
{
return
0
;
}
char
*
key
=
getIndexKey
(
ct
);
SSkipListIterator
*
iter
=
tSkipListCreateIterFromVal
(
mem
->
mem
,
key
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
...
...
@@ -261,7 +271,9 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
return
0
;
}
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
cache
==
NULL
)
{
return
0
;
}
if
(
cache
==
NULL
)
{
return
0
;
}
IndexCache
*
pCache
=
cache
;
MemTable
*
mem
=
NULL
,
*
imm
=
NULL
;
...
...
@@ -275,14 +287,12 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
SIndexTerm
*
term
=
query
->
term
;
EIndexQueryType
qtype
=
query
->
qType
;
CacheTerm
ct
=
{.
colVal
=
term
->
colVal
,
.
version
=
atomic_load_32
(
&
pCache
->
version
)};
// indexCacheDebug(pCache);
int
ret
=
indexQueryMem
(
mem
,
&
ct
,
qtype
,
result
,
s
);
if
(
ret
==
0
&&
*
s
!=
kTypeDeletion
)
{
// continue search in imm
ret
=
indexQueryMem
(
imm
,
&
ct
,
qtype
,
result
,
s
);
}
// cacheTermDestroy(ct);
indexMemUnRef
(
mem
);
indexMemUnRef
(
imm
);
...
...
@@ -291,23 +301,33 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
}
void
indexCacheRef
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
if
(
cache
==
NULL
)
{
return
;
}
int
ref
=
T_REF_INC
(
cache
);
UNUSED
(
ref
);
}
void
indexCacheUnRef
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
if
(
cache
==
NULL
)
{
return
;
}
int
ref
=
T_REF_DEC
(
cache
);
if
(
ref
==
0
)
{
indexCacheDestroy
(
cache
);
}
if
(
ref
==
0
)
{
indexCacheDestroy
(
cache
);
}
}
void
indexMemRef
(
MemTable
*
tbl
)
{
if
(
tbl
==
NULL
)
{
return
;
}
if
(
tbl
==
NULL
)
{
return
;
}
int
ref
=
T_REF_INC
(
tbl
);
UNUSED
(
ref
);
}
void
indexMemUnRef
(
MemTable
*
tbl
)
{
if
(
tbl
==
NULL
)
{
return
;
}
if
(
tbl
==
NULL
)
{
return
;
}
int
ref
=
T_REF_DEC
(
tbl
);
if
(
ref
==
0
)
{
SSkipList
*
slt
=
tbl
->
mem
;
...
...
@@ -317,7 +337,9 @@ void indexMemUnRef(MemTable* tbl) {
}
static
void
cacheTermDestroy
(
CacheTerm
*
ct
)
{
if
(
ct
==
NULL
)
{
return
;
}
if
(
ct
==
NULL
)
{
return
;
}
free
(
ct
->
colVal
);
free
(
ct
);
}
...
...
@@ -332,7 +354,9 @@ static int32_t compareKey(const void* l, const void* r) {
// compare colVal
int32_t
cmp
=
strcmp
(
lt
->
colVal
,
rt
->
colVal
);
if
(
cmp
==
0
)
{
return
rt
->
version
-
lt
->
version
;
}
if
(
cmp
==
0
)
{
return
rt
->
version
-
lt
->
version
;
}
return
cmp
;
}
...
...
@@ -352,11 +376,10 @@ static void doMergeWork(SSchedMsg* msg) {
}
static
bool
indexCacheIteratorNext
(
Iterate
*
itera
)
{
SSkipListIterator
*
iter
=
itera
->
iter
;
if
(
iter
==
NULL
)
{
return
false
;
}
IterateValue
*
iv
=
&
itera
->
val
;
if
(
iv
->
colVal
!=
NULL
&&
iv
->
val
!=
NULL
)
{
// indexError("value in cache: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
if
(
iter
==
NULL
)
{
return
false
;
}
IterateValue
*
iv
=
&
itera
->
val
;
iterateValueDestroy
(
iv
,
false
);
bool
next
=
tSkipListIterNext
(
iter
);
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
11eb2719
...
...
@@ -55,15 +55,14 @@ static void tfileGenFileFullName(char* fullname, const char* path, uint64_t s
TFileCache
*
tfileCacheCreate
(
const
char
*
path
)
{
TFileCache
*
tcache
=
calloc
(
1
,
sizeof
(
TFileCache
));
if
(
tcache
==
NULL
)
{
return
NULL
;
}
if
(
tcache
==
NULL
)
{
return
NULL
;
}
tcache
->
tableCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
tcache
->
capacity
=
64
;
SArray
*
files
=
tfileGetFileList
(
path
);
uint64_t
suid
;
int32_t
colId
,
version
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
files
);
i
++
)
{
char
*
file
=
taosArrayGetP
(
files
,
i
);
...
...
@@ -74,12 +73,13 @@ TFileCache* tfileCacheCreate(const char* path) {
}
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
if
(
reader
==
NULL
)
{
goto
End
;
}
if
(
reader
==
NULL
)
{
goto
End
;
}
TFileHeader
*
header
=
&
reader
->
header
;
ICacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
)};
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
)};
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
taosHashPut
(
tcache
->
tableCache
,
buf
,
sz
,
&
reader
,
sizeof
(
void
*
));
...
...
@@ -93,7 +93,9 @@ End:
return
NULL
;
}
void
tfileCacheDestroy
(
TFileCache
*
tcache
)
{
if
(
tcache
==
NULL
)
{
return
;
}
if
(
tcache
==
NULL
)
{
return
;
}
// free table cache
TFileReader
**
reader
=
taosHashIterate
(
tcache
->
tableCache
,
NULL
);
...
...
@@ -114,7 +116,9 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
int32_t
sz
=
indexSerialCacheKey
(
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
TFileReader
**
reader
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
sz
);
if
(
reader
==
NULL
)
{
return
NULL
;
}
if
(
reader
==
NULL
)
{
return
NULL
;
}
tfileReaderRef
(
*
reader
);
return
*
reader
;
...
...
@@ -137,7 +141,9 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
}
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
)
{
TFileReader
*
reader
=
calloc
(
1
,
sizeof
(
TFileReader
));
if
(
reader
==
NULL
)
{
return
NULL
;
}
if
(
reader
==
NULL
)
{
return
NULL
;
}
// T_REF_INC(reader);
reader
->
ctx
=
ctx
;
...
...
@@ -157,7 +163,9 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
return
reader
;
}
void
tfileReaderDestroy
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
;
}
if
(
reader
==
NULL
)
{
return
;
}
// T_REF_INC(reader);
fstDestroy
(
reader
->
fst
);
writerCtxDestroy
(
reader
->
ctx
,
reader
->
remove
);
...
...
@@ -197,7 +205,9 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
tfileGenFileFullName
(
fullname
,
path
,
suid
,
colName
,
version
);
// indexInfo("open write file name %s", fullname);
WriterCtx
*
wcx
=
writerCtxCreate
(
TFile
,
fullname
,
false
,
1024
*
1024
*
64
);
if
(
wcx
==
NULL
)
{
return
NULL
;
}
if
(
wcx
==
NULL
)
{
return
NULL
;
}
TFileHeader
tfh
=
{
0
};
tfh
.
suid
=
suid
;
...
...
@@ -212,24 +222,15 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
tfileGenFileFullName
(
fullname
,
path
,
suid
,
colName
,
version
);
WriterCtx
*
wc
=
writerCtxCreate
(
TFile
,
fullname
,
true
,
1024
*
1024
*
1024
);
// indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
if
(
wc
==
NULL
)
{
return
NULL
;
}
indexInfo
(
"open read file name:%s, size: %d"
,
wc
->
file
.
buf
,
wc
->
file
.
size
);
if
(
wc
==
NULL
)
{
return
NULL
;
}
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
return
reader
;
}
TFileWriter
*
tfileWriterCreate
(
WriterCtx
*
ctx
,
TFileHeader
*
header
)
{
// char pathBuf[128] = {0};
// sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
// TFileHeader header = {.suid = suid, .version = version, .colName = {0}, colType = colType};
// memcpy(header.colName, );
// char buf[TFILE_HADER_PRE_SIZE];
// int len = TFILE_HADER_PRE_SIZE;
// if (len != ctx->write(ctx, buf, len)) {
// indexError("index: %" PRIu64 " failed to write header info", header->suid);
// return NULL;
//}
TFileWriter
*
tw
=
calloc
(
1
,
sizeof
(
TFileWriter
));
if
(
tw
==
NULL
)
{
indexError
(
"index: %"
PRIu64
" failed to alloc TFilerWriter"
,
header
->
suid
);
...
...
@@ -278,34 +279,14 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
// check buf has enough space or not
int32_t
ttsz
=
TF_TABLE_TATOAL_SIZE
(
tbsz
);
// if (offset + ttsz >= bufLimit) {
// // batch write
// indexInfo("offset: %d, ttsz: %d", offset, ttsz);
// // std::cout << "offset: " << offset << std::endl;
// // std::cout << "ttsz:" << ttsz < < < std::endl;
// tw->ctx->write(tw->ctx, buf, offset);
// offset = 0;
// memset(buf, 0, bufLimit);
// p = buf;
//}
// if (ttsz >= bufLimit) {
//}
char
*
buf
=
calloc
(
1
,
ttsz
*
sizeof
(
char
));
char
*
p
=
buf
;
tfileSerialTableIdsToBuf
(
p
,
v
->
tableId
);
tw
->
ctx
->
write
(
tw
->
ctx
,
buf
,
ttsz
);
// offset += ttsz;
// p = buf + offset;
// set up value offset
v
->
offset
=
tw
->
offset
;
tw
->
offset
+=
ttsz
;
free
(
buf
);
}
// if (offset != 0) {
// write reversed data in buf to tindex
// tw->ctx->write(tw->ctx, buf, offset);
//}
// tfree(buf);
tw
->
fb
=
fstBuilderCreate
(
tw
->
ctx
,
0
);
if
(
tw
->
fb
==
NULL
)
{
...
...
@@ -331,19 +312,25 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
return
0
;
}
void
tfileWriterClose
(
TFileWriter
*
tw
)
{
if
(
tw
==
NULL
)
{
return
;
}
if
(
tw
==
NULL
)
{
return
;
}
writerCtxDestroy
(
tw
->
ctx
,
false
);
free
(
tw
);
}
void
tfileWriterDestroy
(
TFileWriter
*
tw
)
{
if
(
tw
==
NULL
)
{
return
;
}
if
(
tw
==
NULL
)
{
return
;
}
writerCtxDestroy
(
tw
->
ctx
,
false
);
free
(
tw
);
}
IndexTFile
*
indexTFileCreate
(
const
char
*
path
)
{
TFileCache
*
cache
=
tfileCacheCreate
(
path
);
if
(
cache
==
NULL
)
{
return
NULL
;
}
if
(
cache
==
NULL
)
{
return
NULL
;
}
IndexTFile
*
tfile
=
calloc
(
1
,
sizeof
(
IndexTFile
));
if
(
tfile
==
NULL
)
{
...
...
@@ -355,21 +342,27 @@ IndexTFile* indexTFileCreate(const char* path) {
return
tfile
;
}
void
indexTFileDestroy
(
IndexTFile
*
tfile
)
{
if
(
tfile
==
NULL
)
{
return
;
}
if
(
tfile
==
NULL
)
{
return
;
}
tfileCacheDestroy
(
tfile
->
cache
);
free
(
tfile
);
}
int
indexTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
int
ret
=
-
1
;
if
(
tfile
==
NULL
)
{
return
ret
;
}
if
(
tfile
==
NULL
)
{
return
ret
;
}
IndexTFile
*
pTfile
=
(
IndexTFile
*
)
tfile
;
SIndexTerm
*
term
=
query
->
term
;
ICacheKey
key
=
{.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
TFileReader
*
reader
=
tfileCacheGet
(
pTfile
->
cache
,
&
key
);
if
(
reader
==
NULL
)
{
return
0
;
}
if
(
reader
==
NULL
)
{
return
0
;
}
return
tfileReaderSearch
(
reader
,
query
,
result
);
}
...
...
@@ -391,7 +384,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
TFileFstIter
*
tIter
=
iiter
->
iter
;
StreamWithStateResult
*
rt
=
streamWithStateNextWith
(
tIter
->
st
,
NULL
);
if
(
rt
==
NULL
)
{
return
false
;
}
if
(
rt
==
NULL
)
{
return
false
;
}
int32_t
sz
=
0
;
char
*
ch
=
(
char
*
)
fstSliceData
(
&
rt
->
data
,
&
sz
);
...
...
@@ -401,7 +396,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
offset
=
(
uint64_t
)(
rt
->
out
.
out
);
swsResultDestroy
(
rt
);
// set up iterate value
if
(
tfileReaderLoadTableIds
(
tIter
->
rdr
,
offset
,
iv
->
val
)
!=
0
)
{
return
false
;
}
if
(
tfileReaderLoadTableIds
(
tIter
->
rdr
,
offset
,
iv
->
val
)
!=
0
)
{
return
false
;
}
iv
->
colVal
=
colVal
;
return
true
;
...
...
@@ -412,7 +409,9 @@ static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
static
TFileFstIter
*
tfileFstIteratorCreate
(
TFileReader
*
reader
)
{
TFileFstIter
*
tIter
=
calloc
(
1
,
sizeof
(
TFileFstIter
));
if
(
tIter
==
NULL
)
{
return
NULL
;
}
if
(
tIter
==
NULL
)
{
return
NULL
;
}
tIter
->
ctx
=
automCtxCreate
(
NULL
,
AUTOMATION_ALWAYS
);
tIter
->
fb
=
fstSearch
(
reader
->
fst
,
tIter
->
ctx
);
...
...
@@ -422,7 +421,9 @@ static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
}
Iterate
*
tfileIteratorCreate
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
NULL
;
}
if
(
reader
==
NULL
)
{
return
NULL
;
}
Iterate
*
iter
=
calloc
(
1
,
sizeof
(
Iterate
));
iter
->
iter
=
tfileFstIteratorCreate
(
reader
);
...
...
@@ -437,7 +438,9 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
return
iter
;
}
void
tfileIteratorDestroy
(
Iterate
*
iter
)
{
if
(
iter
==
NULL
)
{
return
;
}
if
(
iter
==
NULL
)
{
return
;
}
IterateValue
*
iv
=
&
iter
->
val
;
iterateValueDestroy
(
iv
,
true
);
...
...
@@ -452,7 +455,9 @@ void tfileIteratorDestroy(Iterate* iter) {
}
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
uint64_t
suid
,
char
*
colName
)
{
if
(
tf
==
NULL
)
{
return
NULL
;
}
if
(
tf
==
NULL
)
{
return
NULL
;
}
ICacheKey
key
=
{.
suid
=
suid
,
.
colType
=
TSDB_DATA_TYPE_BINARY
,
.
colName
=
colName
,
.
nColName
=
strlen
(
colName
)};
return
tfileCacheGet
(
tf
->
cache
,
&
key
);
}
...
...
@@ -464,7 +469,9 @@ static int tfileUidCompare(const void* a, const void* b) {
}
static
int
tfileStrCompare
(
const
void
*
a
,
const
void
*
b
)
{
int
ret
=
strcmp
((
char
*
)
a
,
(
char
*
)
b
);
if
(
ret
==
0
)
{
return
ret
;
}
if
(
ret
==
0
)
{
return
ret
;
}
return
ret
<
0
?
-
1
:
1
;
}
...
...
@@ -479,13 +486,17 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
TFileValue
*
tfileValueCreate
(
char
*
val
)
{
TFileValue
*
tf
=
calloc
(
1
,
sizeof
(
TFileValue
));
if
(
tf
==
NULL
)
{
return
NULL
;
}
if
(
tf
==
NULL
)
{
return
NULL
;
}
tf
->
colVal
=
tstrdup
(
val
);
tf
->
tableId
=
taosArrayInit
(
32
,
sizeof
(
uint64_t
));
return
tf
;
}
int
tfileValuePush
(
TFileValue
*
tf
,
uint64_t
val
)
{
if
(
tf
==
NULL
)
{
return
-
1
;
}
if
(
tf
==
NULL
)
{
return
-
1
;
}
taosArrayPush
(
tf
->
tableId
,
&
val
);
return
0
;
}
...
...
@@ -506,7 +517,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
static
int
tfileWriteFstOffset
(
TFileWriter
*
tw
,
int32_t
offset
)
{
int32_t
fstOffset
=
offset
+
sizeof
(
tw
->
header
.
fstOffset
);
tw
->
header
.
fstOffset
=
fstOffset
;
if
(
sizeof
(
fstOffset
)
!=
tw
->
ctx
->
write
(
tw
->
ctx
,
(
char
*
)
&
fstOffset
,
sizeof
(
fstOffset
)))
{
return
-
1
;
}
if
(
sizeof
(
fstOffset
)
!=
tw
->
ctx
->
write
(
tw
->
ctx
,
(
char
*
)
&
fstOffset
,
sizeof
(
fstOffset
)))
{
return
-
1
;
}
tw
->
offset
+=
sizeof
(
fstOffset
);
return
0
;
}
...
...
@@ -517,7 +530,9 @@ static int tfileWriteHeader(TFileWriter* writer) {
memcpy
(
buf
,
(
char
*
)
header
,
sizeof
(
buf
));
int
nwrite
=
writer
->
ctx
->
write
(
writer
->
ctx
,
buf
,
sizeof
(
buf
));
if
(
sizeof
(
buf
)
!=
nwrite
)
{
return
-
1
;
}
if
(
sizeof
(
buf
)
!=
nwrite
)
{
return
-
1
;
}
writer
->
offset
=
nwrite
;
return
0
;
}
...
...
@@ -559,7 +574,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
static
int
FST_MAX_SIZE
=
64
*
1024
*
1024
;
char
*
buf
=
calloc
(
1
,
sizeof
(
char
)
*
FST_MAX_SIZE
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
if
(
buf
==
NULL
)
{
return
-
1
;
}
WriterCtx
*
ctx
=
reader
->
ctx
;
int32_t
nread
=
ctx
->
readFrom
(
ctx
,
buf
,
FST_MAX_SIZE
,
reader
->
header
.
fstOffset
);
...
...
@@ -584,23 +601,31 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
int32_t
total
=
sizeof
(
uint64_t
)
*
nid
;
char
*
buf
=
calloc
(
1
,
total
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
if
(
buf
==
NULL
)
{
return
-
1
;
}
nread
=
ctx
->
readFrom
(
ctx
,
buf
,
total
,
offset
+
sizeof
(
nid
));
assert
(
total
==
nread
);
for
(
int32_t
i
=
0
;
i
<
nid
;
i
++
)
{
taosArrayPush
(
result
,
(
uint64_t
*
)
buf
+
i
);
}
for
(
int32_t
i
=
0
;
i
<
nid
;
i
++
)
{
taosArrayPush
(
result
,
(
uint64_t
*
)
buf
+
i
);
}
free
(
buf
);
return
0
;
}
void
tfileReaderRef
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
;
}
if
(
reader
==
NULL
)
{
return
;
}
int
ref
=
T_REF_INC
(
reader
);
UNUSED
(
ref
);
}
void
tfileReaderUnRef
(
TFileReader
*
reader
)
{
if
(
reader
==
NULL
)
{
return
;
}
if
(
reader
==
NULL
)
{
return
;
}
int
ref
=
T_REF_DEC
(
reader
);
if
(
ref
==
0
)
{
// do nothing
...
...
@@ -616,11 +641,15 @@ static SArray* tfileGetFileList(const char* path) {
uint32_t
version
;
DIR
*
dir
=
opendir
(
path
);
if
(
NULL
==
dir
)
{
return
NULL
;
}
if
(
NULL
==
dir
)
{
return
NULL
;
}
struct
dirent
*
entry
;
while
((
entry
=
readdir
(
dir
))
!=
NULL
)
{
char
*
file
=
entry
->
d_name
;
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
buf
,
&
version
))
{
continue
;
}
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
buf
,
&
version
))
{
continue
;
}
size_t
len
=
strlen
(
path
)
+
1
+
strlen
(
file
)
+
1
;
char
*
buf
=
calloc
(
1
,
len
);
...
...
@@ -643,15 +672,9 @@ static void tfileDestroyFileName(void* elem) {
free
(
p
);
}
static
int
tfileCompare
(
const
void
*
a
,
const
void
*
b
)
{
const
char
*
aName
=
*
(
char
**
)
a
;
const
char
*
bName
=
*
(
char
**
)
b
;
size_t
aLen
=
strlen
(
aName
);
size_t
bLen
=
strlen
(
bName
);
int
ret
=
strncmp
(
aName
,
bName
,
aLen
>
bLen
?
aLen
:
bLen
);
if
(
ret
==
0
)
{
return
ret
;
}
return
ret
<
0
?
-
1
:
1
;
const
char
*
as
=
*
(
char
**
)
a
;
const
char
*
bs
=
*
(
char
**
)
b
;
return
strcmp
(
as
,
bs
);
}
static
int
tfileParseFileName
(
const
char
*
filename
,
uint64_t
*
suid
,
char
*
col
,
int
*
version
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录