Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a4ad2b3f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a4ad2b3f
编写于
6月 10, 2022
作者:
dengyihao
提交者:
GitHub
6月 10, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13656 from taosdata/enh/refactorIdx
refactor: index module
上级
f974d625
f057cd19
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
148 addition
and
148 deletion
+148
-148
source/libs/index/inc/indexCache.h
source/libs/index/inc/indexCache.h
+11
-11
source/libs/index/inc/indexComm.h
source/libs/index/inc/indexComm.h
+2
-2
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+12
-12
source/libs/index/inc/indexTfile.h
source/libs/index/inc/indexTfile.h
+4
-4
source/libs/index/src/index.c
source/libs/index/src/index.c
+34
-34
source/libs/index/src/indexCache.c
source/libs/index/src/indexCache.c
+52
-52
source/libs/index/src/indexComm.c
source/libs/index/src/indexComm.c
+11
-11
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+1
-1
source/libs/index/src/indexJson.c
source/libs/index/src/indexJson.c
+2
-2
source/libs/index/src/indexTfile.c
source/libs/index/src/indexTfile.c
+14
-14
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+5
-5
未找到文件。
source/libs/index/inc/indexCache.h
浏览文件 @
a4ad2b3f
...
...
@@ -62,25 +62,25 @@ typedef struct CacheTerm {
}
CacheTerm
;
//
IndexCache
*
i
nde
xCacheCreate
(
SIndex
*
idx
,
uint64_t
suid
,
const
char
*
colName
,
int8_t
type
);
IndexCache
*
i
d
xCacheCreate
(
SIndex
*
idx
,
uint64_t
suid
,
const
char
*
colName
,
int8_t
type
);
void
i
nde
xCacheForceToMerge
(
void
*
cache
);
void
i
nde
xCacheDestroy
(
void
*
cache
);
void
i
nde
xCacheBroadcast
(
void
*
cache
);
void
i
nde
xCacheWait
(
void
*
cache
);
void
i
d
xCacheForceToMerge
(
void
*
cache
);
void
i
d
xCacheDestroy
(
void
*
cache
);
void
i
d
xCacheBroadcast
(
void
*
cache
);
void
i
d
xCacheWait
(
void
*
cache
);
Iterate
*
i
nde
xCacheIteratorCreate
(
IndexCache
*
cache
);
Iterate
*
i
d
xCacheIteratorCreate
(
IndexCache
*
cache
);
void
idxCacheIteratorDestroy
(
Iterate
*
iiter
);
int
i
nde
xCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
);
int
i
d
xCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
);
// int indexCacheGet(void *cache, uint64_t *rst);
int
i
nde
xCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
,
STermValueType
*
s
);
int
i
d
xCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
,
STermValueType
*
s
);
void
i
nde
xCacheRef
(
IndexCache
*
cache
);
void
i
nde
xCacheUnRef
(
IndexCache
*
cache
);
void
i
d
xCacheRef
(
IndexCache
*
cache
);
void
i
d
xCacheUnRef
(
IndexCache
*
cache
);
void
i
nde
xCacheDebug
(
IndexCache
*
cache
);
void
i
d
xCacheDebug
(
IndexCache
*
cache
);
void
idxCacheDestroyImm
(
IndexCache
*
cache
);
#ifdef __cplusplus
...
...
source/libs/index/inc/indexComm.h
浏览文件 @
a4ad2b3f
...
...
@@ -34,11 +34,11 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef
TExeCond
(
*
_cache_range_compare
)(
void
*
a
,
void
*
b
,
int8_t
type
);
__compar_fn_t
i
nde
xGetCompar
(
int8_t
type
);
__compar_fn_t
i
d
xGetCompar
(
int8_t
type
);
TExeCond
tCompare
(
__compar_fn_t
func
,
int8_t
cmpType
,
void
*
a
,
void
*
b
,
int8_t
dType
);
TExeCond
tDoCompare
(
__compar_fn_t
func
,
int8_t
cmpType
,
void
*
a
,
void
*
b
);
_cache_range_compare
i
nde
xGetCompare
(
RangeType
ty
);
_cache_range_compare
i
d
xGetCompare
(
RangeType
ty
);
int32_t
idxConvertData
(
void
*
src
,
int8_t
type
,
void
**
dst
);
int32_t
idxConvertDataToStr
(
void
*
src
,
int8_t
type
,
void
**
dst
);
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
a4ad2b3f
...
...
@@ -133,24 +133,24 @@ typedef struct TFileCacheKey {
}
ICacheKey
;
int
idxFlushCacheToTFile
(
SIndex
*
sIdx
,
void
*
,
bool
quit
);
int64_t
i
nde
xAddRef
(
void
*
p
);
int32_t
i
nde
xRemoveRef
(
int64_t
ref
);
void
i
nde
xAcquireRef
(
int64_t
ref
);
void
i
nde
xReleaseRef
(
int64_t
ref
);
int64_t
i
d
xAddRef
(
void
*
p
);
int32_t
i
d
xRemoveRef
(
int64_t
ref
);
void
i
d
xAcquireRef
(
int64_t
ref
);
void
i
d
xReleaseRef
(
int64_t
ref
);
int32_t
i
nde
xSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
);
int32_t
i
d
xSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
#define I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
#define I
D
X_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
#define I
NDE
X_TYPE_GET_TYPE(ty) (ty & 0x0F)
#define I
D
X_TYPE_GET_TYPE(ty) (ty & 0x0F)
#define I
NDE
X_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
do {
\
uint8_t oldTy = ty;
\
ty = (ty >> 4) | exTy;
\
ty = (ty << 4) | oldTy;
\
#define I
D
X_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
do { \
uint8_t oldTy = ty; \
ty = (ty >> 4) | exTy; \
ty = (ty << 4) | oldTy; \
} while (0)
#ifdef __cplusplus
...
...
source/libs/index/inc/indexTfile.h
浏览文件 @
a4ad2b3f
...
...
@@ -117,10 +117,10 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int
tfileWriterFinish
(
TFileWriter
*
tw
);
//
IndexTFile
*
i
nde
xTFileCreate
(
const
char
*
path
);
void
i
nde
xTFileDestroy
(
IndexTFile
*
tfile
);
int
i
nde
xTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
);
int
i
nde
xTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
);
IndexTFile
*
i
d
xTFileCreate
(
const
char
*
path
);
void
i
d
xTFileDestroy
(
IndexTFile
*
tfile
);
int
i
d
xTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
);
int
i
d
xTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
);
Iterate
*
tfileIteratorCreate
(
TFileReader
*
reader
);
void
tfileIteratorDestroy
(
Iterate
*
iterator
);
...
...
source/libs/index/src/index.c
浏览文件 @
a4ad2b3f
...
...
@@ -90,7 +90,7 @@ static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateV
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
static
void
i
nde
xPost
(
void
*
idx
)
{
static
void
i
d
xPost
(
void
*
idx
)
{
SIndex
*
pIdx
=
idx
;
tsem_post
(
&
pIdx
->
sem
);
}
...
...
@@ -106,8 +106,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
return
-
1
;
}
// sIdx->cache = (void*)i
nde
xCacheCreate(sIdx);
sIdx
->
tindex
=
i
nde
xTFileCreate
(
path
);
// sIdx->cache = (void*)i
d
xCacheCreate(sIdx);
sIdx
->
tindex
=
i
d
xTFileCreate
(
path
);
if
(
sIdx
->
tindex
==
NULL
)
{
goto
END
;
}
...
...
@@ -118,8 +118,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
taosThreadMutexInit
(
&
sIdx
->
mtx
,
NULL
);
tsem_init
(
&
sIdx
->
sem
,
0
,
0
);
sIdx
->
refId
=
i
nde
xAddRef
(
sIdx
);
i
nde
xAcquireRef
(
sIdx
->
refId
);
sIdx
->
refId
=
i
d
xAddRef
(
sIdx
);
i
d
xAcquireRef
(
sIdx
->
refId
);
*
index
=
sIdx
;
return
0
;
...
...
@@ -136,7 +136,7 @@ void indexDestroy(void* handle) {
SIndex
*
sIdx
=
handle
;
taosThreadMutexDestroy
(
&
sIdx
->
mtx
);
tsem_destroy
(
&
sIdx
->
sem
);
i
nde
xTFileDestroy
(
sIdx
->
tindex
);
i
d
xTFileDestroy
(
sIdx
->
tindex
);
taosMemoryFree
(
sIdx
->
path
);
taosMemoryFree
(
sIdx
);
return
;
...
...
@@ -147,33 +147,33 @@ void indexClose(SIndex* sIdx) {
void
*
iter
=
taosHashIterate
(
sIdx
->
colObj
,
NULL
);
while
(
iter
)
{
IndexCache
**
pCache
=
iter
;
i
nde
xCacheForceToMerge
((
void
*
)(
*
pCache
));
i
d
xCacheForceToMerge
((
void
*
)(
*
pCache
));
indexInfo
(
"%s wait to merge"
,
(
*
pCache
)
->
colName
);
indexWait
((
void
*
)(
sIdx
));
indexInfo
(
"%s finish to wait"
,
(
*
pCache
)
->
colName
);
iter
=
taosHashIterate
(
sIdx
->
colObj
,
iter
);
i
nde
xCacheUnRef
(
*
pCache
);
i
d
xCacheUnRef
(
*
pCache
);
}
taosHashCleanup
(
sIdx
->
colObj
);
sIdx
->
colObj
=
NULL
;
}
i
nde
xReleaseRef
(
sIdx
->
refId
);
i
nde
xRemoveRef
(
sIdx
->
refId
);
i
d
xReleaseRef
(
sIdx
->
refId
);
i
d
xRemoveRef
(
sIdx
->
refId
);
}
int64_t
i
nde
xAddRef
(
void
*
p
)
{
int64_t
i
d
xAddRef
(
void
*
p
)
{
// impl
return
taosAddRef
(
indexRefMgt
,
p
);
}
int32_t
i
nde
xRemoveRef
(
int64_t
ref
)
{
int32_t
i
d
xRemoveRef
(
int64_t
ref
)
{
// impl later
return
taosRemoveRef
(
indexRefMgt
,
ref
);
}
void
i
nde
xAcquireRef
(
int64_t
ref
)
{
void
i
d
xAcquireRef
(
int64_t
ref
)
{
// impl
taosAcquireRef
(
indexRefMgt
,
ref
);
}
void
i
nde
xReleaseRef
(
int64_t
ref
)
{
void
i
d
xReleaseRef
(
int64_t
ref
)
{
// impl
taosReleaseRef
(
indexRefMgt
,
ref
);
}
...
...
@@ -186,11 +186,11 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
),
.
colType
=
p
->
colType
};
int32_t
sz
=
i
nde
xSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
i
d
xSerialCacheKey
(
&
key
,
buf
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
if
(
cache
==
NULL
)
{
IndexCache
*
pCache
=
i
nde
xCacheCreate
(
index
,
p
->
suid
,
p
->
colName
,
p
->
colType
);
IndexCache
*
pCache
=
i
d
xCacheCreate
(
index
,
p
->
suid
,
p
->
colName
,
p
->
colType
);
taosHashPut
(
index
->
colObj
,
buf
,
sz
,
&
pCache
,
sizeof
(
void
*
));
}
}
...
...
@@ -201,12 +201,12 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
),
.
colType
=
p
->
colType
};
int32_t
sz
=
i
nde
xSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
i
d
xSerialCacheKey
(
&
key
,
buf
);
indexDebug
(
"w suid: %"
PRIu64
", colName: %s, colType: %d"
,
key
.
suid
,
key
.
colName
,
key
.
colType
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
assert
(
*
cache
!=
NULL
);
int
ret
=
i
nde
xCachePut
(
*
cache
,
p
,
uid
);
int
ret
=
i
d
xCachePut
(
*
cache
,
p
,
uid
);
if
(
ret
!=
0
)
{
return
ret
;
}
...
...
@@ -289,7 +289,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
tm
->
nColName
=
nColName
;
char
*
buf
=
NULL
;
int32_t
len
=
idxConvertDataToStr
((
void
*
)
colVal
,
I
NDE
X_TYPE_GET_TYPE
(
colType
),
(
void
**
)
&
buf
);
int32_t
len
=
idxConvertDataToStr
((
void
*
)
colVal
,
I
D
X_TYPE_GET_TYPE
(
colType
),
(
void
**
)
&
buf
);
assert
(
len
!=
-
1
);
tm
->
colVal
=
buf
;
...
...
@@ -331,7 +331,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
ICacheKey
key
=
{
.
suid
=
term
->
suid
,
.
colName
=
term
->
colName
,
.
nColName
=
strlen
(
term
->
colName
),
.
colType
=
term
->
colType
};
indexDebug
(
"r suid: %"
PRIu64
", colName: %s, colType: %d"
,
key
.
suid
,
key
.
colName
,
key
.
colType
);
int32_t
sz
=
i
nde
xSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
i
d
xSerialCacheKey
(
&
key
,
buf
);
taosThreadMutexLock
(
&
sIdx
->
mtx
);
IndexCache
**
pCache
=
taosHashGet
(
sIdx
->
colObj
,
buf
,
sz
);
...
...
@@ -345,14 +345,14 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
int64_t
st
=
taosGetTimestampUs
();
SIdxTRslt
*
tr
=
idxTRsltCreate
();
if
(
0
==
i
nde
xCacheSearch
(
cache
,
query
,
tr
,
&
s
))
{
if
(
0
==
i
d
xCacheSearch
(
cache
,
query
,
tr
,
&
s
))
{
if
(
s
==
kTypeDeletion
)
{
indexInfo
(
"col: %s already drop by"
,
term
->
colName
);
// coloum already drop by other oper, no need to query tindex
return
0
;
}
else
{
st
=
taosGetTimestampUs
();
if
(
0
!=
i
nde
xTFileSearch
(
sIdx
->
tindex
,
query
,
tr
))
{
if
(
0
!=
i
d
xTFileSearch
(
sIdx
->
tindex
,
query
,
tr
))
{
indexError
(
"corrupt at index(TFile) col:%s val: %s"
,
term
->
colName
,
term
->
colVal
);
goto
END
;
}
...
...
@@ -465,23 +465,23 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
IndexCache
*
pCache
=
(
IndexCache
*
)
cache
;
while
(
quit
&&
atomic_load_32
(
&
pCache
->
merging
)
==
1
)
{
}
while
(
quit
&&
atomic_load_32
(
&
pCache
->
merging
)
==
1
)
;
TFileReader
*
pReader
=
tfileGetReaderByCol
(
sIdx
->
tindex
,
pCache
->
suid
,
pCache
->
colName
);
if
(
pReader
==
NULL
)
{
indexWarn
(
"empty tfile reader found"
);
}
// handle flush
Iterate
*
cacheIter
=
i
nde
xCacheIteratorCreate
(
pCache
);
Iterate
*
cacheIter
=
i
d
xCacheIteratorCreate
(
pCache
);
if
(
cacheIter
==
NULL
)
{
indexError
(
"%p immtable is empty, ignore merge opera"
,
pCache
);
idxCacheDestroyImm
(
pCache
);
tfileReaderUnRef
(
pReader
);
atomic_store_32
(
&
pCache
->
merging
,
0
);
if
(
quit
)
{
i
nde
xPost
(
sIdx
);
i
d
xPost
(
sIdx
);
}
i
nde
xReleaseRef
(
sIdx
->
refId
);
i
d
xReleaseRef
(
sIdx
->
refId
);
return
0
;
}
...
...
@@ -532,7 +532,7 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
tfileIteratorDestroy
(
tfileIter
);
tfileReaderUnRef
(
pReader
);
i
nde
xCacheUnRef
(
pCache
);
i
d
xCacheUnRef
(
pCache
);
int64_t
cost
=
taosGetTimestampUs
()
-
st
;
if
(
ret
!=
0
)
{
...
...
@@ -542,9 +542,9 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
}
atomic_store_32
(
&
pCache
->
merging
,
0
);
if
(
quit
)
{
i
nde
xPost
(
sIdx
);
i
d
xPost
(
sIdx
);
}
i
nde
xReleaseRef
(
sIdx
->
refId
);
i
d
xReleaseRef
(
sIdx
->
refId
);
return
ret
;
}
...
...
@@ -561,7 +561,7 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
value
->
colVal
=
NULL
;
}
static
int64_t
i
ndexGetAvaial
bleVer
(
SIndex
*
sIdx
,
IndexCache
*
cache
)
{
static
int64_t
i
dxGetAvaila
bleVer
(
SIndex
*
sIdx
,
IndexCache
*
cache
)
{
ICacheKey
key
=
{.
suid
=
cache
->
suid
,
.
colName
=
cache
->
colName
,
.
nColName
=
strlen
(
cache
->
colName
)};
int64_t
ver
=
CACHE_VERSION
(
cache
);
...
...
@@ -579,7 +579,7 @@ static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
return
ver
;
}
static
int
idxGenTFile
(
SIndex
*
sIdx
,
IndexCache
*
cache
,
SArray
*
batch
)
{
int64_t
version
=
i
ndexGetAvaial
bleVer
(
sIdx
,
cache
);
int64_t
version
=
i
dxGetAvaila
bleVer
(
sIdx
,
cache
);
indexInfo
(
"file name version: %"
PRId64
""
,
version
);
uint8_t
colType
=
cache
->
type
;
...
...
@@ -620,8 +620,8 @@ END:
return
-
1
;
}
int32_t
i
nde
xSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
)
{
bool
hasJson
=
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
key
->
colType
,
TSDB_DATA_TYPE_JSON
);
int32_t
i
d
xSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
)
{
bool
hasJson
=
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
key
->
colType
,
TSDB_DATA_TYPE_JSON
);
char
*
p
=
buf
;
char
tbuf
[
65
]
=
{
0
};
...
...
source/libs/index/src/indexCache.c
浏览文件 @
a4ad2b3f
...
...
@@ -68,7 +68,7 @@ static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRsl
cacheSearchLessThan_JSON
,
cacheSearchLessEqual_JSON
,
cacheSearchGreaterThan_JSON
,
cacheSearchGreaterEqual_JSON
,
cacheSearchRange_JSON
}};
static
void
d
oMergeWork
(
SSchedMsg
*
msg
);
static
void
idxD
oMergeWork
(
SSchedMsg
*
msg
);
static
bool
idxCacheIteratorNext
(
Iterate
*
itera
);
static
int32_t
cacheSearchTerm
(
void
*
cache
,
SIndexTerm
*
term
,
SIdxTRslt
*
tr
,
STermValueType
*
s
)
{
...
...
@@ -127,7 +127,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
MemTable
*
mem
=
cache
;
IndexCache
*
pCache
=
mem
->
pCache
;
_cache_range_compare
cmpFn
=
i
nde
xGetCompare
(
type
);
_cache_range_compare
cmpFn
=
i
d
xGetCompare
(
type
);
CacheTerm
*
pCt
=
taosMemoryCalloc
(
1
,
sizeof
(
CacheTerm
));
pCt
->
colVal
=
term
->
colVal
;
...
...
@@ -187,7 +187,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
pCt
->
version
=
atomic_load_64
(
&
pCache
->
version
);
char
*
exBuf
=
NULL
;
if
(
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
))
{
if
(
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
))
{
exBuf
=
idxPackJsonData
(
term
);
pCt
->
colVal
=
exBuf
;
}
...
...
@@ -257,7 +257,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
if
(
cache
==
NULL
)
{
return
0
;
}
_cache_range_compare
cmpFn
=
i
nde
xGetCompare
(
type
);
_cache_range_compare
cmpFn
=
i
d
xGetCompare
(
type
);
MemTable
*
mem
=
cache
;
IndexCache
*
pCache
=
mem
->
pCache
;
...
...
@@ -266,7 +266,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
pCt
->
colVal
=
term
->
colVal
;
pCt
->
version
=
atomic_load_64
(
&
pCache
->
version
);
int8_t
dType
=
I
NDE
X_TYPE_GET_TYPE
(
term
->
colType
);
int8_t
dType
=
I
D
X_TYPE_GET_TYPE
(
term
->
colType
);
int
skip
=
0
;
char
*
exBuf
=
NULL
;
if
(
type
==
CONTAINS
)
{
...
...
@@ -331,9 +331,9 @@ static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTRslt* tr, ST
// impl later
return
0
;
}
static
IterateValue
*
i
nde
xCacheIteratorGetValue
(
Iterate
*
iter
);
static
IterateValue
*
i
d
xCacheIteratorGetValue
(
Iterate
*
iter
);
IndexCache
*
i
nde
xCacheCreate
(
SIndex
*
idx
,
uint64_t
suid
,
const
char
*
colName
,
int8_t
type
)
{
IndexCache
*
i
d
xCacheCreate
(
SIndex
*
idx
,
uint64_t
suid
,
const
char
*
colName
,
int8_t
type
)
{
IndexCache
*
cache
=
taosMemoryCalloc
(
1
,
sizeof
(
IndexCache
));
if
(
cache
==
NULL
)
{
indexError
(
"failed to create index cache"
);
...
...
@@ -342,7 +342,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
cache
->
mem
=
idxInternalCacheCreate
(
type
);
cache
->
mem
->
pCache
=
cache
;
cache
->
colName
=
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
type
,
TSDB_DATA_TYPE_JSON
)
?
tstrdup
(
JSON_COLUMN
)
:
tstrdup
(
colName
);
cache
->
colName
=
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
type
,
TSDB_DATA_TYPE_JSON
)
?
tstrdup
(
JSON_COLUMN
)
:
tstrdup
(
colName
);
cache
->
type
=
type
;
cache
->
index
=
idx
;
cache
->
version
=
0
;
...
...
@@ -352,13 +352,13 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
taosThreadMutexInit
(
&
cache
->
mtx
,
NULL
);
taosThreadCondInit
(
&
cache
->
finished
,
NULL
);
i
nde
xCacheRef
(
cache
);
i
d
xCacheRef
(
cache
);
if
(
idx
!=
NULL
)
{
i
nde
xAcquireRef
(
idx
->
refId
);
i
d
xAcquireRef
(
idx
->
refId
);
}
return
cache
;
}
void
i
nde
xCacheDebug
(
IndexCache
*
cache
)
{
void
i
d
xCacheDebug
(
IndexCache
*
cache
)
{
MemTable
*
tbl
=
NULL
;
taosThreadMutexLock
(
&
cache
->
mtx
);
...
...
@@ -405,7 +405,7 @@ void indexCacheDebug(IndexCache* cache) {
}
}
void
i
nde
xCacheDestroySkiplist
(
SSkipList
*
slt
)
{
void
i
d
xCacheDestroySkiplist
(
SSkipList
*
slt
)
{
SSkipListIterator
*
iter
=
tSkipListCreateIter
(
slt
);
while
(
iter
!=
NULL
&&
tSkipListIterNext
(
iter
))
{
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
...
...
@@ -418,11 +418,11 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
tSkipListDestroyIter
(
iter
);
tSkipListDestroy
(
slt
);
}
void
i
nde
xCacheBroadcast
(
void
*
cache
)
{
void
i
d
xCacheBroadcast
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
taosThreadCondBroadcast
(
&
pCache
->
finished
);
}
void
i
nde
xCacheWait
(
void
*
cache
)
{
void
i
d
xCacheWait
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
taosThreadCondWait
(
&
pCache
->
finished
,
&
pCache
->
mtx
);
}
...
...
@@ -435,14 +435,14 @@ void idxCacheDestroyImm(IndexCache* cache) {
tbl
=
cache
->
imm
;
cache
->
imm
=
NULL
;
// or throw int bg thread
i
nde
xCacheBroadcast
(
cache
);
i
d
xCacheBroadcast
(
cache
);
taosThreadMutexUnlock
(
&
cache
->
mtx
);
idxMemUnRef
(
tbl
);
idxMemUnRef
(
tbl
);
}
void
i
nde
xCacheDestroy
(
void
*
cache
)
{
void
i
d
xCacheDestroy
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
if
(
pCache
==
NULL
)
{
return
;
...
...
@@ -455,12 +455,12 @@ void indexCacheDestroy(void* cache) {
taosThreadMutexDestroy
(
&
pCache
->
mtx
);
taosThreadCondDestroy
(
&
pCache
->
finished
);
if
(
pCache
->
index
!=
NULL
)
{
i
nde
xReleaseRef
(((
SIndex
*
)
pCache
->
index
)
->
refId
);
i
d
xReleaseRef
(((
SIndex
*
)
pCache
->
index
)
->
refId
);
}
taosMemoryFree
(
pCache
);
}
Iterate
*
i
nde
xCacheIteratorCreate
(
IndexCache
*
cache
)
{
Iterate
*
i
d
xCacheIteratorCreate
(
IndexCache
*
cache
)
{
if
(
cache
->
imm
==
NULL
)
{
return
NULL
;
}
...
...
@@ -477,7 +477,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
iiter
->
val
.
colVal
=
NULL
;
iiter
->
iter
=
tbl
!=
NULL
?
tSkipListCreateIter
(
tbl
->
mem
)
:
NULL
;
iiter
->
next
=
idxCacheIteratorNext
;
iiter
->
getValue
=
i
nde
xCacheIteratorGetValue
;
iiter
->
getValue
=
i
d
xCacheIteratorGetValue
;
taosThreadMutexUnlock
(
&
cache
->
mtx
);
...
...
@@ -492,30 +492,30 @@ void idxCacheIteratorDestroy(Iterate* iter) {
taosMemoryFree
(
iter
);
}
int
i
nde
xCacheSchedToMerge
(
IndexCache
*
pCache
,
bool
notify
)
{
int
i
d
xCacheSchedToMerge
(
IndexCache
*
pCache
,
bool
notify
)
{
SSchedMsg
schedMsg
=
{
0
};
schedMsg
.
fp
=
d
oMergeWork
;
schedMsg
.
fp
=
idxD
oMergeWork
;
schedMsg
.
ahandle
=
pCache
;
if
(
notify
)
{
schedMsg
.
thandle
=
taosMemoryMalloc
(
1
);
}
schedMsg
.
msg
=
NULL
;
i
nde
xAcquireRef
(
pCache
->
index
->
refId
);
i
d
xAcquireRef
(
pCache
->
index
->
refId
);
taosScheduleTask
(
indexQhandle
,
&
schedMsg
);
return
0
;
}
static
void
i
nde
xCacheMakeRoomForWrite
(
IndexCache
*
cache
)
{
static
void
i
d
xCacheMakeRoomForWrite
(
IndexCache
*
cache
)
{
while
(
true
)
{
if
(
cache
->
occupiedMem
*
MEM_ESTIMATE_RADIO
<
MEM_THRESHOLD
)
{
break
;
}
else
if
(
cache
->
imm
!=
NULL
)
{
// TODO: wake up by condition variable
i
nde
xCacheWait
(
cache
);
i
d
xCacheWait
(
cache
);
}
else
{
bool
quit
=
cache
->
occupiedMem
>=
MEM_SIGNAL_QUIT
?
true
:
false
;
i
nde
xCacheRef
(
cache
);
i
d
xCacheRef
(
cache
);
cache
->
imm
=
cache
->
mem
;
cache
->
mem
=
idxInternalCacheCreate
(
cache
->
type
);
cache
->
mem
->
pCache
=
cache
;
...
...
@@ -525,18 +525,18 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
}
// sched to merge
// unref cache in bgwork
i
nde
xCacheSchedToMerge
(
cache
,
quit
);
i
d
xCacheSchedToMerge
(
cache
,
quit
);
}
}
}
int
i
nde
xCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
int
i
d
xCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
bool
hasJson
=
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
);
bool
hasJson
=
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
);
IndexCache
*
pCache
=
cache
;
i
nde
xCacheRef
(
pCache
);
i
d
xCacheRef
(
pCache
);
// encode data
CacheTerm
*
ct
=
taosMemoryCalloc
(
1
,
sizeof
(
CacheTerm
));
if
(
cache
==
NULL
)
{
...
...
@@ -559,7 +559,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
taosThreadMutexLock
(
&
pCache
->
mtx
);
pCache
->
occupiedMem
+=
estimate
;
i
nde
xCacheMakeRoomForWrite
(
pCache
);
i
d
xCacheMakeRoomForWrite
(
pCache
);
MemTable
*
tbl
=
pCache
->
mem
;
idxMemRef
(
tbl
);
tSkipListPut
(
tbl
->
mem
,
(
char
*
)
ct
);
...
...
@@ -567,29 +567,29 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
taosThreadMutexUnlock
(
&
pCache
->
mtx
);
i
nde
xCacheUnRef
(
pCache
);
i
d
xCacheUnRef
(
pCache
);
return
0
;
// encode end
}
void
i
nde
xCacheForceToMerge
(
void
*
cache
)
{
void
i
d
xCacheForceToMerge
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
i
nde
xCacheRef
(
pCache
);
i
d
xCacheRef
(
pCache
);
taosThreadMutexLock
(
&
pCache
->
mtx
);
indexInfo
(
"%p is forced to merge into tfile"
,
pCache
);
pCache
->
occupiedMem
+=
MEM_SIGNAL_QUIT
;
i
nde
xCacheMakeRoomForWrite
(
pCache
);
i
d
xCacheMakeRoomForWrite
(
pCache
);
taosThreadMutexUnlock
(
&
pCache
->
mtx
);
i
nde
xCacheUnRef
(
pCache
);
i
d
xCacheUnRef
(
pCache
);
return
;
}
int
i
nde
xCacheDel
(
void
*
cache
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
int
i
d
xCacheDel
(
void
*
cache
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
IndexCache
*
pCache
=
cache
;
return
0
;
}
static
int32_t
i
nde
xQueryMem
(
MemTable
*
mem
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
,
STermValueType
*
s
)
{
static
int32_t
i
d
xQueryMem
(
MemTable
*
mem
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
,
STermValueType
*
s
)
{
if
(
mem
==
NULL
)
{
return
0
;
}
...
...
@@ -597,13 +597,13 @@ static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* t
SIndexTerm
*
term
=
query
->
term
;
EIndexQueryType
qtype
=
query
->
qType
;
if
(
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
))
{
if
(
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
))
{
return
cacheSearch
[
1
][
qtype
](
mem
,
term
,
tr
,
s
);
}
else
{
return
cacheSearch
[
0
][
qtype
](
mem
,
term
,
tr
,
s
);
}
}
int
i
nde
xCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
result
,
STermValueType
*
s
)
{
int
i
d
xCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
result
,
STermValueType
*
s
)
{
int64_t
st
=
taosGetTimestampUs
();
if
(
cache
==
NULL
)
{
return
0
;
...
...
@@ -618,10 +618,10 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STe
idxMemRef
(
imm
);
taosThreadMutexUnlock
(
&
pCache
->
mtx
);
int
ret
=
(
mem
&&
mem
->
mem
)
?
i
nde
xQueryMem
(
mem
,
query
,
result
,
s
)
:
0
;
int
ret
=
(
mem
&&
mem
->
mem
)
?
i
d
xQueryMem
(
mem
,
query
,
result
,
s
)
:
0
;
if
(
ret
==
0
&&
*
s
!=
kTypeDeletion
)
{
// continue search in imm
ret
=
(
imm
&&
imm
->
mem
)
?
i
nde
xQueryMem
(
imm
,
query
,
result
,
s
)
:
0
;
ret
=
(
imm
&&
imm
->
mem
)
?
i
d
xQueryMem
(
imm
,
query
,
result
,
s
)
:
0
;
}
idxMemUnRef
(
mem
);
...
...
@@ -631,20 +631,20 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STe
return
ret
;
}
void
i
nde
xCacheRef
(
IndexCache
*
cache
)
{
void
i
d
xCacheRef
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
int
ref
=
T_REF_INC
(
cache
);
UNUSED
(
ref
);
}
void
i
nde
xCacheUnRef
(
IndexCache
*
cache
)
{
void
i
d
xCacheUnRef
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
int
ref
=
T_REF_DEC
(
cache
);
if
(
ref
==
0
)
{
i
nde
xCacheDestroy
(
cache
);
i
d
xCacheDestroy
(
cache
);
}
}
...
...
@@ -662,7 +662,7 @@ void idxMemUnRef(MemTable* tbl) {
int
ref
=
T_REF_DEC
(
tbl
);
if
(
ref
==
0
)
{
SSkipList
*
slt
=
tbl
->
mem
;
i
nde
xCacheDestroySkiplist
(
slt
);
i
d
xCacheDestroySkiplist
(
slt
);
taosMemoryFree
(
tbl
);
}
}
...
...
@@ -693,15 +693,15 @@ static int32_t idxCacheTermCompare(const void* l, const void* r) {
return
cmp
;
}
static
int
i
nde
xFindCh
(
char
*
a
,
char
c
)
{
static
int
i
d
xFindCh
(
char
*
a
,
char
c
)
{
char
*
p
=
a
;
while
(
*
p
!=
0
&&
*
p
++
!=
c
)
{
}
return
p
-
a
;
}
static
int
idxCacheJsonTermCompareImpl
(
char
*
a
,
char
*
b
)
{
// int alen = i
nde
xFindCh(a, '&');
// int blen = i
nde
xFindCh(b, '&');
// int alen = i
d
xFindCh(a, '&');
// int blen = i
d
xFindCh(b, '&');
// int cmp = strncmp(a, b, MIN(alen, blen));
// if (cmp == 0) {
...
...
@@ -730,9 +730,9 @@ static int32_t idxCacheJsonTermCompare(const void* l, const void* r) {
return
cmp
;
}
static
MemTable
*
idxInternalCacheCreate
(
int8_t
type
)
{
int
ttype
=
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
type
,
TSDB_DATA_TYPE_JSON
)
?
TSDB_DATA_TYPE_BINARY
:
TSDB_DATA_TYPE_BINARY
;
int
ttype
=
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
type
,
TSDB_DATA_TYPE_JSON
)
?
TSDB_DATA_TYPE_BINARY
:
TSDB_DATA_TYPE_BINARY
;
int32_t
(
*
cmpFn
)(
const
void
*
l
,
const
void
*
r
)
=
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
type
,
TSDB_DATA_TYPE_JSON
)
?
idxCacheJsonTermCompare
:
idxCacheTermCompare
;
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
type
,
TSDB_DATA_TYPE_JSON
)
?
idxCacheJsonTermCompare
:
idxCacheTermCompare
;
MemTable
*
tbl
=
taosMemoryCalloc
(
1
,
sizeof
(
MemTable
));
idxMemRef
(
tbl
);
...
...
@@ -742,7 +742,7 @@ static MemTable* idxInternalCacheCreate(int8_t type) {
return
tbl
;
}
static
void
d
oMergeWork
(
SSchedMsg
*
msg
)
{
static
void
idxD
oMergeWork
(
SSchedMsg
*
msg
)
{
IndexCache
*
pCache
=
msg
->
ahandle
;
SIndex
*
sidx
=
(
SIndex
*
)
pCache
->
index
;
...
...
@@ -771,7 +771,7 @@ static bool idxCacheIteratorNext(Iterate* itera) {
return
next
;
}
static
IterateValue
*
i
nde
xCacheIteratorGetValue
(
Iterate
*
iter
)
{
static
IterateValue
*
i
d
xCacheIteratorGetValue
(
Iterate
*
iter
)
{
// opt later
return
&
iter
->
val
;
}
source/libs/index/src/indexComm.c
浏览文件 @
a4ad2b3f
...
...
@@ -75,35 +75,35 @@ char* idxInt2str(int64_t val, char* dst, int radix) {
;
return
dst
-
1
;
}
__compar_fn_t
i
nde
xGetCompar
(
int8_t
type
)
{
__compar_fn_t
i
d
xGetCompar
(
int8_t
type
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
return
(
__compar_fn_t
)
strcmp
;
}
return
getComparFunc
(
type
,
0
);
}
static
TExeCond
tCompareLessThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
i
nde
xGetCompar
(
type
);
__compar_fn_t
func
=
i
d
xGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_LESS_THAN
,
a
,
b
,
type
);
}
static
TExeCond
tCompareLessEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
i
nde
xGetCompar
(
type
);
__compar_fn_t
func
=
i
d
xGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_LESS_EQUAL
,
a
,
b
,
type
);
}
static
TExeCond
tCompareGreaterThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
i
nde
xGetCompar
(
type
);
__compar_fn_t
func
=
i
d
xGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_GREATER_THAN
,
a
,
b
,
type
);
}
static
TExeCond
tCompareGreaterEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
i
nde
xGetCompar
(
type
);
__compar_fn_t
func
=
i
d
xGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_GREATER_EQUAL
,
a
,
b
,
type
);
}
static
TExeCond
tCompareContains
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
i
nde
xGetCompar
(
type
);
__compar_fn_t
func
=
i
d
xGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_TERM
,
a
,
b
,
type
);
}
static
TExeCond
tCompareEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
i
nde
xGetCompar
(
type
);
__compar_fn_t
func
=
i
d
xGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_TERM
,
a
,
b
,
type
);
}
TExeCond
tCompare
(
__compar_fn_t
func
,
int8_t
cmptype
,
void
*
a
,
void
*
b
,
int8_t
dtype
)
{
...
...
@@ -205,14 +205,14 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
static
TExeCond
(
*
rangeCompare
[])(
void
*
a
,
void
*
b
,
int8_t
type
)
=
{
tCompareLessThan
,
tCompareLessEqual
,
tCompareGreaterThan
,
tCompareGreaterEqual
,
tCompareContains
,
tCompareEqual
};
_cache_range_compare
i
nde
xGetCompare
(
RangeType
ty
)
{
return
rangeCompare
[
ty
];
}
_cache_range_compare
i
d
xGetCompare
(
RangeType
ty
)
{
return
rangeCompare
[
ty
];
}
char
*
idxPackJsonData
(
SIndexTerm
*
itm
)
{
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t
ty
=
I
NDE
X_TYPE_GET_TYPE
(
itm
->
colType
);
uint8_t
ty
=
I
D
X_TYPE_GET_TYPE
(
itm
->
colType
);
int32_t
sz
=
itm
->
nColName
+
itm
->
nColVal
+
sizeof
(
uint8_t
)
+
sizeof
(
JSON_VALUE_DELIM
)
*
2
+
1
;
char
*
buf
=
(
char
*
)
taosMemoryCalloc
(
1
,
sz
);
...
...
@@ -240,7 +240,7 @@ char* idxPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t
ty
=
I
NDE
X_TYPE_GET_TYPE
(
itm
->
colType
);
uint8_t
ty
=
I
D
X_TYPE_GET_TYPE
(
itm
->
colType
);
int32_t
sz
=
itm
->
nColName
+
itm
->
nColVal
+
sizeof
(
uint8_t
)
+
sizeof
(
JSON_VALUE_DELIM
)
*
2
+
1
;
char
*
buf
=
(
char
*
)
taosMemoryCalloc
(
1
,
sz
);
...
...
@@ -267,7 +267,7 @@ char* idxPackJsonDataPrefixNoType(SIndexTerm* itm, int32_t* skip) {
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t
ty
=
I
NDE
X_TYPE_GET_TYPE
(
itm
->
colType
);
uint8_t
ty
=
I
D
X_TYPE_GET_TYPE
(
itm
->
colType
);
int32_t
sz
=
itm
->
nColName
+
itm
->
nColVal
+
sizeof
(
uint8_t
)
+
sizeof
(
JSON_VALUE_DELIM
)
*
2
+
1
;
char
*
buf
=
(
char
*
)
taosMemoryCalloc
(
1
,
sz
);
...
...
source/libs/index/src/indexFilter.c
浏览文件 @
a4ad2b3f
...
...
@@ -318,7 +318,7 @@ int sifLessThan(void *a, void *b, int16_t dtype) {
}
int
sifEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
//__compar_fn_t func = i
nde
xGetCompar(dtype);
//__compar_fn_t func = i
d
xGetCompar(dtype);
return
(
int
)
tDoCompare
(
func
,
QUERY_TERM
,
a
,
b
);
}
static
Filter
sifGetFilterFunc
(
EIndexQueryType
type
,
bool
*
reverse
)
{
...
...
source/libs/index/src/indexJson.c
浏览文件 @
a4ad2b3f
...
...
@@ -30,7 +30,7 @@ int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
}
else
{
p
->
colType
=
TSDB_DATA_TYPE_DOUBLE
;
}
I
NDE
X_TYPE_ADD_EXTERN_TYPE
(
p
->
colType
,
TSDB_DATA_TYPE_JSON
);
I
D
X_TYPE_ADD_EXTERN_TYPE
(
p
->
colType
,
TSDB_DATA_TYPE_JSON
);
}
// handle put
return
indexPut
(
index
,
terms
,
uid
);
...
...
@@ -48,7 +48,7 @@ int indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *res
}
else
{
p
->
colType
=
TSDB_DATA_TYPE_DOUBLE
;
}
I
NDE
X_TYPE_ADD_EXTERN_TYPE
(
p
->
colType
,
TSDB_DATA_TYPE_JSON
);
I
D
X_TYPE_ADD_EXTERN_TYPE
(
p
->
colType
,
TSDB_DATA_TYPE_JSON
);
}
// handle search
return
indexSearch
(
index
,
tq
,
result
);
...
...
source/libs/index/src/indexTfile.c
浏览文件 @
a4ad2b3f
...
...
@@ -118,7 +118,7 @@ TFileCache* tfileCacheCreate(const char* path) {
ICacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
(
int32_t
)
strlen
(
header
->
colName
)};
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
i
nde
xSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
i
d
xSerialCacheKey
(
&
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
taosHashPut
(
tcache
->
tableCache
,
buf
,
sz
,
&
reader
,
sizeof
(
void
*
));
tfileReaderRef
(
reader
);
...
...
@@ -149,7 +149,7 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
ICacheKey
*
key
)
{
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
i
nde
xSerialCacheKey
(
key
,
buf
);
int32_t
sz
=
i
d
xSerialCacheKey
(
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
TFileReader
**
reader
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
sz
);
if
(
reader
==
NULL
||
*
reader
==
NULL
)
{
...
...
@@ -161,7 +161,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
}
void
tfileCachePut
(
TFileCache
*
tcache
,
ICacheKey
*
key
,
TFileReader
*
reader
)
{
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
i
nde
xSerialCacheKey
(
key
,
buf
);
int32_t
sz
=
i
d
xSerialCacheKey
(
key
,
buf
);
// remove last version index reader
TFileReader
**
p
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
sz
);
if
(
p
!=
NULL
&&
*
p
!=
NULL
)
{
...
...
@@ -281,7 +281,7 @@ static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return
0
;
}
static
int32_t
tfSearchRegex
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTRslt
*
tr
)
{
bool
hasJson
=
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
tem
->
colType
,
TSDB_DATA_TYPE_JSON
);
bool
hasJson
=
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
tem
->
colType
,
TSDB_DATA_TYPE_JSON
);
int
ret
=
0
;
char
*
p
=
tem
->
colVal
;
...
...
@@ -305,7 +305,7 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr,
int
ret
=
0
;
char
*
p
=
tem
->
colVal
;
int
skip
=
0
;
_cache_range_compare
cmpFn
=
i
nde
xGetCompare
(
type
);
_cache_range_compare
cmpFn
=
i
d
xGetCompare
(
type
);
SArray
*
offsets
=
taosArrayInit
(
16
,
sizeof
(
uint64_t
));
...
...
@@ -431,7 +431,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
p
=
idxPackJsonDataPrefix
(
tem
,
&
skip
);
}
_cache_range_compare
cmpFn
=
i
nde
xGetCompare
(
ctype
);
_cache_range_compare
cmpFn
=
i
d
xGetCompare
(
ctype
);
SArray
*
offsets
=
taosArrayInit
(
16
,
sizeof
(
uint64_t
));
...
...
@@ -457,7 +457,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
}
else
if
(
0
!=
strncmp
(
ch
,
p
,
skip
))
{
continue
;
}
cond
=
cmpFn
(
ch
+
skip
,
tem
->
colVal
,
I
NDE
X_TYPE_GET_TYPE
(
tem
->
colType
));
cond
=
cmpFn
(
ch
+
skip
,
tem
->
colVal
,
I
D
X_TYPE_GET_TYPE
(
tem
->
colType
));
}
if
(
MATCH
==
cond
)
{
tfileReaderLoadTableIds
((
TFileReader
*
)
reader
,
rt
->
out
.
out
,
tr
->
total
);
...
...
@@ -476,7 +476,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr
SIndexTerm
*
term
=
query
->
term
;
EIndexQueryType
qtype
=
query
->
qType
;
int
ret
=
0
;
if
(
I
NDE
X_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
))
{
if
(
I
D
X_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
))
{
ret
=
tfSearch
[
1
][
qtype
](
reader
,
term
,
tr
);
}
else
{
ret
=
tfSearch
[
0
][
qtype
](
reader
,
term
,
tr
);
...
...
@@ -536,7 +536,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
__compar_fn_t
fn
;
int8_t
colType
=
tw
->
header
.
colType
;
colType
=
I
NDE
X_TYPE_GET_TYPE
(
colType
);
colType
=
I
D
X_TYPE_GET_TYPE
(
colType
);
if
(
colType
==
TSDB_DATA_TYPE_BINARY
||
colType
==
TSDB_DATA_TYPE_NCHAR
)
{
fn
=
tfileStrCompare
;
}
else
{
...
...
@@ -620,7 +620,7 @@ void tfileWriterDestroy(TFileWriter* tw) {
taosMemoryFree
(
tw
);
}
IndexTFile
*
i
nde
xTFileCreate
(
const
char
*
path
)
{
IndexTFile
*
i
d
xTFileCreate
(
const
char
*
path
)
{
TFileCache
*
cache
=
tfileCacheCreate
(
path
);
if
(
cache
==
NULL
)
{
return
NULL
;
...
...
@@ -635,7 +635,7 @@ IndexTFile* indexTFileCreate(const char* path) {
tfile
->
cache
=
cache
;
return
tfile
;
}
void
i
nde
xTFileDestroy
(
IndexTFile
*
tfile
)
{
void
i
d
xTFileDestroy
(
IndexTFile
*
tfile
)
{
if
(
tfile
==
NULL
)
{
return
;
}
...
...
@@ -644,7 +644,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
taosMemoryFree
(
tfile
);
}
int
i
nde
xTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
result
)
{
int
i
d
xTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
result
)
{
int
ret
=
-
1
;
if
(
tfile
==
NULL
)
{
return
ret
;
...
...
@@ -667,7 +667,7 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
return
tfileReaderSearch
(
reader
,
query
,
result
);
}
int
i
nde
xTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
int
i
d
xTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
// term->nColName, .version = 1};
...
...
@@ -845,7 +845,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
TFileHeader
*
header
=
&
write
->
header
;
uint8_t
colType
=
header
->
colType
;
colType
=
I
NDE
X_TYPE_GET_TYPE
(
colType
);
colType
=
I
D
X_TYPE_GET_TYPE
(
colType
);
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)(
tval
->
colVal
),
(
size_t
)
strlen
(
tval
->
colVal
));
if
(
fstBuilderInsert
(
write
->
fb
,
key
,
tval
->
offset
))
{
fstSliceDestroy
(
&
key
);
...
...
source/libs/index/test/indexTests.cc
浏览文件 @
a4ad2b3f
...
...
@@ -521,10 +521,10 @@ class CacheObj {
public:
CacheObj
()
{
// TODO
cache
=
i
nde
xCacheCreate
(
NULL
,
0
,
"voltage"
,
TSDB_DATA_TYPE_BINARY
);
cache
=
i
d
xCacheCreate
(
NULL
,
0
,
"voltage"
,
TSDB_DATA_TYPE_BINARY
);
}
int
Put
(
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
)
{
int
ret
=
i
nde
xCachePut
(
cache
,
term
,
uid
);
int
ret
=
i
d
xCachePut
(
cache
,
term
,
uid
);
if
(
ret
!=
0
)
{
//
std
::
cout
<<
"failed to put into cache: "
<<
ret
<<
std
::
endl
;
...
...
@@ -533,12 +533,12 @@ class CacheObj {
}
void
Debug
()
{
//
i
nde
xCacheDebug
(
cache
);
i
d
xCacheDebug
(
cache
);
}
int
Get
(
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
)
{
SIdxTRslt
*
tr
=
idxTRsltCreate
();
int
ret
=
i
nde
xCacheSearch
(
cache
,
query
,
tr
,
s
);
int
ret
=
i
d
xCacheSearch
(
cache
,
query
,
tr
,
s
);
idxTRsltMergeTo
(
tr
,
result
);
idxTRsltDestroy
(
tr
);
...
...
@@ -549,7 +549,7 @@ class CacheObj {
}
~
CacheObj
()
{
// TODO
i
nde
xCacheDestroy
(
cache
);
i
d
xCacheDestroy
(
cache
);
}
private:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录