Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
843449f1
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
843449f1
编写于
7月 15, 2022
作者:
dengyihao
提交者:
GitHub
7月 15, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14906 from taosdata/enh/addLRUToIdx
feat:add lru to idx
上级
6a62954e
a6e71424
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
245 addition
and
195 deletion
+245
-195
include/libs/index/index.h
include/libs/index/index.h
+4
-2
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+5
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-7
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+3
-3
source/libs/index/inc/indexFstFile.h
source/libs/index/inc/indexFstFile.h
+3
-1
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+6
-20
source/libs/index/inc/indexTfile.h
source/libs/index/inc/indexTfile.h
+4
-3
source/libs/index/src/index.c
source/libs/index/src/index.c
+43
-23
source/libs/index/src/indexCache.c
source/libs/index/src/indexCache.c
+9
-9
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+6
-6
source/libs/index/src/indexFst.c
source/libs/index/src/indexFst.c
+3
-2
source/libs/index/src/indexFstFile.c
source/libs/index/src/indexFstFile.c
+75
-27
source/libs/index/src/indexTfile.c
source/libs/index/src/indexTfile.c
+20
-34
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+7
-5
source/libs/index/test/fstUT.cc
source/libs/index/test/fstUT.cc
+2
-3
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+32
-29
source/libs/index/test/jsonUT.cc
source/libs/index/test/jsonUT.cc
+20
-21
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+1
-0
未找到文件。
include/libs/index/index.h
浏览文件 @
843449f1
...
@@ -28,7 +28,6 @@ extern "C" {
...
@@ -28,7 +28,6 @@ extern "C" {
typedef
struct
SIndex
SIndex
;
typedef
struct
SIndex
SIndex
;
typedef
struct
SIndexTerm
SIndexTerm
;
typedef
struct
SIndexTerm
SIndexTerm
;
typedef
struct
SIndexOpts
SIndexOpts
;
typedef
struct
SIndexMultiTermQuery
SIndexMultiTermQuery
;
typedef
struct
SIndexMultiTermQuery
SIndexMultiTermQuery
;
typedef
struct
SArray
SIndexMultiTerm
;
typedef
struct
SArray
SIndexMultiTerm
;
...
@@ -62,6 +61,9 @@ typedef enum {
...
@@ -62,6 +61,9 @@ typedef enum {
QUERY_MAX
QUERY_MAX
}
EIndexQueryType
;
}
EIndexQueryType
;
typedef
struct
SIndexOpts
{
int32_t
cacheSize
;
// MB
}
SIndexOpts
;
/*
/*
* create multi query
* create multi query
* @param oper (input, relation between querys)
* @param oper (input, relation between querys)
...
@@ -173,7 +175,7 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms);
...
@@ -173,7 +175,7 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms);
* @param:
* @param:
* @param:
* @param:
*/
*/
SIndexOpts
*
indexOptsCreate
();
SIndexOpts
*
indexOptsCreate
(
int32_t
cacheSize
);
void
indexOptsDestroy
(
SIndexOpts
*
opts
);
void
indexOptsDestroy
(
SIndexOpts
*
opts
);
/*
/*
...
...
source/client/inc/clientInt.h
浏览文件 @
843449f1
...
@@ -314,6 +314,11 @@ int taos_options_imp(TSDB_OPTION option, const char* str);
...
@@ -314,6 +314,11 @@ int taos_options_imp(TSDB_OPTION option, const char* str);
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThreads
);
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThreads
);
typedef
struct
AsyncArg
{
SRpcMsg
msg
;
SEpSet
*
pEpset
;
}
AsyncArg
;
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
);
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
);
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
...
...
source/client/src/clientImpl.c
浏览文件 @
843449f1
...
@@ -1266,13 +1266,8 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
...
@@ -1266,13 +1266,8 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
}
}
}
}
typedef
struct
SchedArg
{
SRpcMsg
msg
;
SEpSet
*
pEpset
;
}
SchedArg
;
int32_t
doProcessMsgFromServer
(
void
*
param
)
{
int32_t
doProcessMsgFromServer
(
void
*
param
)
{
SchedArg
*
arg
=
(
Sched
Arg
*
)
param
;
AsyncArg
*
arg
=
(
Async
Arg
*
)
param
;
SRpcMsg
*
pMsg
=
&
arg
->
msg
;
SRpcMsg
*
pMsg
=
&
arg
->
msg
;
SEpSet
*
pEpSet
=
arg
->
pEpset
;
SEpSet
*
pEpSet
=
arg
->
pEpset
;
...
@@ -1335,7 +1330,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
...
@@ -1335,7 +1330,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
memcpy
((
void
*
)
tEpSet
,
(
void
*
)
pEpSet
,
sizeof
(
SEpSet
));
memcpy
((
void
*
)
tEpSet
,
(
void
*
)
pEpSet
,
sizeof
(
SEpSet
));
}
}
SchedArg
*
arg
=
taosMemoryCalloc
(
1
,
sizeof
(
Sched
Arg
));
AsyncArg
*
arg
=
taosMemoryCalloc
(
1
,
sizeof
(
Async
Arg
));
arg
->
msg
=
*
pMsg
;
arg
->
msg
=
*
pMsg
;
arg
->
pEpset
=
tEpSet
;
arg
->
pEpset
=
tEpSet
;
...
...
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
843449f1
...
@@ -99,12 +99,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
...
@@ -99,12 +99,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto
_err
;
goto
_err
;
}
}
// open pTagIdx
// TODO(yihaoDeng), refactor later
char
indexFullPath
[
128
]
=
{
0
};
char
indexFullPath
[
128
]
=
{
0
};
sprintf
(
indexFullPath
,
"%s/%s"
,
pMeta
->
path
,
"invert"
);
sprintf
(
indexFullPath
,
"%s/%s"
,
pMeta
->
path
,
"invert"
);
taosMkDir
(
indexFullPath
);
taosMkDir
(
indexFullPath
);
ret
=
indexOpen
(
indexOptsCreate
(),
indexFullPath
,
(
SIndex
**
)
&
pMeta
->
pTagIvtIdx
);
SIndexOpts
opts
=
{.
cacheSize
=
8
*
1024
*
1024
};
ret
=
indexOpen
(
&
opts
,
indexFullPath
,
(
SIndex
**
)
&
pMeta
->
pTagIvtIdx
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta tag index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
metaError
(
"vgId:%d, failed to open meta tag index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
goto
_err
;
...
...
source/libs/index/inc/indexFstFile.h
浏览文件 @
843449f1
...
@@ -27,7 +27,7 @@ extern "C" {
...
@@ -27,7 +27,7 @@ extern "C" {
#define DefaultMem 1024 * 1024
#define DefaultMem 1024 * 1024
static
char
tmpFile
[]
=
"./index"
;
static
char
tmpFile
[]
=
"./index"
;
typedef
enum
WriterType
{
TM
emory
,
TFile
}
WriterType
;
typedef
enum
WriterType
{
TM
EMORY
,
TFILE
}
WriterType
;
typedef
struct
IFileCtx
{
typedef
struct
IFileCtx
{
int
(
*
write
)(
struct
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
);
int
(
*
write
)(
struct
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
);
...
@@ -35,6 +35,8 @@ typedef struct IFileCtx {
...
@@ -35,6 +35,8 @@ typedef struct IFileCtx {
int
(
*
flush
)(
struct
IFileCtx
*
ctx
);
int
(
*
flush
)(
struct
IFileCtx
*
ctx
);
int
(
*
readFrom
)(
struct
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
,
int32_t
offset
);
int
(
*
readFrom
)(
struct
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
,
int32_t
offset
);
int
(
*
size
)(
struct
IFileCtx
*
ctx
);
int
(
*
size
)(
struct
IFileCtx
*
ctx
);
SLRUCache
*
lru
;
WriterType
type
;
WriterType
type
;
union
{
union
{
struct
{
struct
{
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
843449f1
...
@@ -24,12 +24,9 @@
...
@@ -24,12 +24,9 @@
#include "tchecksum.h"
#include "tchecksum.h"
#include "thash.h"
#include "thash.h"
#include "tlog.h"
#include "tlog.h"
#include "tlrucache.h"
#include "tutil.h"
#include "tutil.h"
#ifdef USE_LUCENE
#include <lucene++/Lucene_c.h>
#endif
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
...
@@ -61,28 +58,17 @@ struct SIndex {
...
@@ -61,28 +58,17 @@ struct SIndex {
void
*
tindex
;
void
*
tindex
;
SHashObj
*
colObj
;
// < field name, field id>
SHashObj
*
colObj
;
// < field name, field id>
int64_t
suid
;
// current super table id, -1 is normal table
int64_t
suid
;
// current super table id, -1 is normal table
int32_t
cVersion
;
// current version allocated to cache
int32_t
cVersion
;
// current version allocated to cache
SLRUCache
*
lru
;
char
*
path
;
char
*
path
;
int8_t
status
;
int8_t
status
;
SIndexStat
stat
;
SIndexStat
stat
;
TdThreadMutex
mtx
;
TdThreadMutex
mtx
;
tsem_t
sem
;
tsem_t
sem
;
bool
quit
;
bool
quit
;
};
SIndexOpts
opts
;
struct
SIndexOpts
{
#ifdef USE_LUCENE
void
*
opts
;
#endif
#ifdef USE_INVERTED_INDEX
int32_t
cacheSize
;
// MB
// add cache module later
#endif
int32_t
cacheOpt
;
// MB
};
};
struct
SIndexMultiTermQuery
{
struct
SIndexMultiTermQuery
{
...
...
source/libs/index/inc/indexTfile.h
浏览文件 @
843449f1
...
@@ -71,6 +71,7 @@ typedef struct TFileReader {
...
@@ -71,6 +71,7 @@ typedef struct TFileReader {
IFileCtx
*
ctx
;
IFileCtx
*
ctx
;
TFileHeader
header
;
TFileHeader
header
;
bool
remove
;
bool
remove
;
void
*
lru
;
}
TFileReader
;
}
TFileReader
;
typedef
struct
IndexTFile
{
typedef
struct
IndexTFile
{
...
@@ -95,14 +96,14 @@ typedef struct TFileReaderOpt {
...
@@ -95,14 +96,14 @@ typedef struct TFileReaderOpt {
}
TFileReaderOpt
;
}
TFileReaderOpt
;
// tfile cache, manage tindex reader
// tfile cache, manage tindex reader
TFileCache
*
tfileCacheCreate
(
const
char
*
path
);
TFileCache
*
tfileCacheCreate
(
SIndex
*
idx
,
const
char
*
path
);
void
tfileCacheDestroy
(
TFileCache
*
tcache
);
void
tfileCacheDestroy
(
TFileCache
*
tcache
);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
ICacheKey
*
key
);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
ICacheKey
*
key
);
void
tfileCachePut
(
TFileCache
*
tcache
,
ICacheKey
*
key
,
TFileReader
*
reader
);
void
tfileCachePut
(
TFileCache
*
tcache
,
ICacheKey
*
key
,
TFileReader
*
reader
);
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
uint64_t
suid
,
char
*
colName
);
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
uint64_t
suid
,
char
*
colName
);
TFileReader
*
tfileReaderOpen
(
char
*
path
,
uint64_t
suid
,
int64_t
version
,
const
char
*
colName
);
TFileReader
*
tfileReaderOpen
(
SIndex
*
idx
,
uint64_t
suid
,
int64_t
version
,
const
char
*
colName
);
TFileReader
*
tfileReaderCreate
(
IFileCtx
*
ctx
);
TFileReader
*
tfileReaderCreate
(
IFileCtx
*
ctx
);
void
tfileReaderDestroy
(
TFileReader
*
reader
);
void
tfileReaderDestroy
(
TFileReader
*
reader
);
int
tfileReaderSearch
(
TFileReader
*
reader
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
);
int
tfileReaderSearch
(
TFileReader
*
reader
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
);
...
@@ -117,7 +118,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order);
...
@@ -117,7 +118,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int
tfileWriterFinish
(
TFileWriter
*
tw
);
int
tfileWriterFinish
(
TFileWriter
*
tw
);
//
//
IndexTFile
*
idxTFileCreate
(
const
char
*
path
);
IndexTFile
*
idxTFileCreate
(
SIndex
*
idx
,
const
char
*
path
);
void
idxTFileDestroy
(
IndexTFile
*
tfile
);
void
idxTFileDestroy
(
IndexTFile
*
tfile
);
int
idxTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
);
int
idxTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
);
int
idxTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
);
int
idxTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SIdxTRslt
*
tr
);
...
...
source/libs/index/src/index.c
浏览文件 @
843449f1
...
@@ -103,44 +103,59 @@ static void indexWait(void* idx) {
...
@@ -103,44 +103,59 @@ static void indexWait(void* idx) {
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
int
ret
=
TSDB_CODE_SUCCESS
;
int
ret
=
TSDB_CODE_SUCCESS
;
taosThreadOnce
(
&
isInit
,
indexInit
);
taosThreadOnce
(
&
isInit
,
indexInit
);
SIndex
*
sI
dx
=
taosMemoryCalloc
(
1
,
sizeof
(
SIndex
));
SIndex
*
i
dx
=
taosMemoryCalloc
(
1
,
sizeof
(
SIndex
));
if
(
sI
dx
==
NULL
)
{
if
(
i
dx
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
sIdx
->
tindex
=
idxTFileCreate
(
path
);
idx
->
lru
=
taosLRUCacheInit
(
opts
->
cacheSize
,
-
1
,
.
5
);
if
(
sIdx
->
tindex
==
NULL
)
{
if
(
idx
->
lru
==
NULL
)
{
ret
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
END
;
}
taosLRUCacheSetStrictCapacity
(
idx
->
lru
,
false
);
idx
->
tindex
=
idxTFileCreate
(
idx
,
path
);
if
(
idx
->
tindex
==
NULL
)
{
ret
=
TSDB_CODE_OUT_OF_MEMORY
;
ret
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
END
;
goto
END
;
}
}
sI
dx
->
colObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
i
dx
->
colObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sI
dx
->
cVersion
=
1
;
i
dx
->
cVersion
=
1
;
sI
dx
->
path
=
tstrdup
(
path
);
i
dx
->
path
=
tstrdup
(
path
);
taosThreadMutexInit
(
&
sI
dx
->
mtx
,
NULL
);
taosThreadMutexInit
(
&
i
dx
->
mtx
,
NULL
);
tsem_init
(
&
sI
dx
->
sem
,
0
,
0
);
tsem_init
(
&
i
dx
->
sem
,
0
,
0
);
sIdx
->
refId
=
idxAddRef
(
sIdx
);
idx
->
refId
=
idxAddRef
(
idx
);
idxAcquireRef
(
sIdx
->
refId
);
idx
->
opts
=
*
opts
;
idxAcquireRef
(
idx
->
refId
);
*
index
=
sI
dx
;
*
index
=
i
dx
;
return
ret
;
return
ret
;
END:
END:
if
(
sI
dx
!=
NULL
)
{
if
(
i
dx
!=
NULL
)
{
indexClose
(
sI
dx
);
indexClose
(
i
dx
);
}
}
*
index
=
NULL
;
*
index
=
NULL
;
return
ret
;
return
ret
;
}
}
void
indexDestroy
(
void
*
handle
)
{
void
indexDestroy
(
void
*
handle
)
{
SIndex
*
sIdx
=
handle
;
SIndex
*
idx
=
handle
;
taosThreadMutexDestroy
(
&
sIdx
->
mtx
);
taosThreadMutexDestroy
(
&
idx
->
mtx
);
tsem_destroy
(
&
sIdx
->
sem
);
tsem_destroy
(
&
idx
->
sem
);
idxTFileDestroy
(
sIdx
->
tindex
);
idxTFileDestroy
(
idx
->
tindex
);
taosMemoryFree
(
sIdx
->
path
);
taosMemoryFree
(
idx
->
path
);
taosMemoryFree
(
sIdx
);
SLRUCache
*
lru
=
idx
->
lru
;
if
(
lru
!=
NULL
)
{
taosLRUCacheEraseUnrefEntries
(
lru
);
taosLRUCacheCleanup
(
lru
);
}
idx
->
lru
=
NULL
;
taosMemoryFree
(
idx
);
return
;
return
;
}
}
void
indexClose
(
SIndex
*
sIdx
)
{
void
indexClose
(
SIndex
*
sIdx
)
{
...
@@ -159,6 +174,7 @@ void indexClose(SIndex* sIdx) {
...
@@ -159,6 +174,7 @@ void indexClose(SIndex* sIdx) {
taosHashCleanup
(
sIdx
->
colObj
);
taosHashCleanup
(
sIdx
->
colObj
);
sIdx
->
colObj
=
NULL
;
sIdx
->
colObj
=
NULL
;
}
}
idxReleaseRef
(
sIdx
->
refId
);
idxReleaseRef
(
sIdx
->
refId
);
idxRemoveRef
(
sIdx
->
refId
);
idxRemoveRef
(
sIdx
->
refId
);
}
}
...
@@ -234,8 +250,12 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
...
@@ -234,8 +250,12 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
)
{
return
1
;
}
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
)
{
return
1
;
}
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
SIndexOpts
*
indexOptsCreate
()
{
return
NULL
;
}
SIndexOpts
*
indexOptsCreate
(
int32_t
cacheSize
)
{
void
indexOptsDestroy
(
SIndexOpts
*
opts
)
{
return
;
}
SIndexOpts
*
opts
=
taosMemoryCalloc
(
1
,
sizeof
(
SIndexOpts
));
opts
->
cacheSize
=
cacheSize
;
return
opts
;
}
void
indexOptsDestroy
(
SIndexOpts
*
opts
)
{
return
taosMemoryFree
(
opts
);
}
/*
/*
* @param: oper
* @param: oper
*
*
...
@@ -641,7 +661,7 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
...
@@ -641,7 +661,7 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
}
}
tfileWriterClose
(
tw
);
tfileWriterClose
(
tw
);
TFileReader
*
reader
=
tfileReaderOpen
(
sIdx
->
path
,
cache
->
suid
,
version
,
cache
->
colName
);
TFileReader
*
reader
=
tfileReaderOpen
(
sIdx
,
cache
->
suid
,
version
,
cache
->
colName
);
if
(
reader
==
NULL
)
{
if
(
reader
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
...
...
source/libs/index/src/indexCache.c
浏览文件 @
843449f1
...
@@ -462,8 +462,8 @@ Iterate* idxCacheIteratorCreate(IndexCache* cache) {
...
@@ -462,8 +462,8 @@ Iterate* idxCacheIteratorCreate(IndexCache* cache) {
if
(
cache
->
imm
==
NULL
)
{
if
(
cache
->
imm
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
Iterate
*
i
i
ter
=
taosMemoryCalloc
(
1
,
sizeof
(
Iterate
));
Iterate
*
iter
=
taosMemoryCalloc
(
1
,
sizeof
(
Iterate
));
if
(
i
i
ter
==
NULL
)
{
if
(
iter
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
taosThreadMutexLock
(
&
cache
->
mtx
);
taosThreadMutexLock
(
&
cache
->
mtx
);
...
@@ -471,15 +471,15 @@ Iterate* idxCacheIteratorCreate(IndexCache* cache) {
...
@@ -471,15 +471,15 @@ Iterate* idxCacheIteratorCreate(IndexCache* cache) {
idxMemRef
(
cache
->
imm
);
idxMemRef
(
cache
->
imm
);
MemTable
*
tbl
=
cache
->
imm
;
MemTable
*
tbl
=
cache
->
imm
;
i
i
ter
->
val
.
val
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
iter
->
val
.
val
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
i
i
ter
->
val
.
colVal
=
NULL
;
iter
->
val
.
colVal
=
NULL
;
i
i
ter
->
iter
=
tbl
!=
NULL
?
tSkipListCreateIter
(
tbl
->
mem
)
:
NULL
;
iter
->
iter
=
tbl
!=
NULL
?
tSkipListCreateIter
(
tbl
->
mem
)
:
NULL
;
i
i
ter
->
next
=
idxCacheIteratorNext
;
iter
->
next
=
idxCacheIteratorNext
;
i
i
ter
->
getValue
=
idxCacheIteratorGetValue
;
iter
->
getValue
=
idxCacheIteratorGetValue
;
taosThreadMutexUnlock
(
&
cache
->
mtx
);
taosThreadMutexUnlock
(
&
cache
->
mtx
);
return
i
i
ter
;
return
iter
;
}
}
void
idxCacheIteratorDestroy
(
Iterate
*
iter
)
{
void
idxCacheIteratorDestroy
(
Iterate
*
iter
)
{
if
(
iter
==
NULL
)
{
if
(
iter
==
NULL
)
{
...
@@ -564,13 +564,13 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
...
@@ -564,13 +564,13 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
idxMemUnRef
(
tbl
);
idxMemUnRef
(
tbl
);
taosThreadMutexUnlock
(
&
pCache
->
mtx
);
taosThreadMutexUnlock
(
&
pCache
->
mtx
);
idxCacheUnRef
(
pCache
);
idxCacheUnRef
(
pCache
);
return
0
;
return
0
;
// encode end
// encode end
}
}
void
idxCacheForceToMerge
(
void
*
cache
)
{
void
idxCacheForceToMerge
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
IndexCache
*
pCache
=
cache
;
idxCacheRef
(
pCache
);
idxCacheRef
(
pCache
);
taosThreadMutexLock
(
&
pCache
->
mtx
);
taosThreadMutexLock
(
&
pCache
->
mtx
);
...
...
source/libs/index/src/indexFilter.c
浏览文件 @
843449f1
...
@@ -31,7 +31,7 @@ typedef struct SIFParam {
...
@@ -31,7 +31,7 @@ typedef struct SIFParam {
SHashObj
*
pFilter
;
SHashObj
*
pFilter
;
SArray
*
result
;
SArray
*
result
;
char
*
condValue
;
char
*
condValue
;
SIdxFltStatus
status
;
SIdxFltStatus
status
;
uint8_t
colValType
;
uint8_t
colValType
;
...
@@ -45,7 +45,7 @@ typedef struct SIFParam {
...
@@ -45,7 +45,7 @@ typedef struct SIFParam {
typedef
struct
SIFCtx
{
typedef
struct
SIFCtx
{
int32_t
code
;
int32_t
code
;
SHashObj
*
pRes
;
/* element is SIFParam */
SHashObj
*
pRes
;
/* element is SIFParam */
bool
noExec
;
// true: just iterate condition tree, and add hint to executor plan
bool
noExec
;
// true: just iterate condition tree, and add hint to executor plan
SIndexMetaArg
arg
;
SIndexMetaArg
arg
;
// SIdxFltStatus st;
// SIdxFltStatus st;
...
@@ -137,7 +137,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
...
@@ -137,7 +137,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
// covert data From snode;
// covert data From snode;
SValueNode
*
vn
=
(
SValueNode
*
)
node
;
SValueNode
*
vn
=
(
SValueNode
*
)
node
;
char
*
pData
=
nodesGetValueFromNode
(
vn
);
char
*
pData
=
nodesGetValueFromNode
(
vn
);
SDataType
*
pType
=
&
vn
->
node
.
resType
;
SDataType
*
pType
=
&
vn
->
node
.
resType
;
int32_t
type
=
pType
->
type
;
int32_t
type
=
pType
->
type
;
int32_t
valLen
=
0
;
int32_t
valLen
=
0
;
...
@@ -175,7 +175,7 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
...
@@ -175,7 +175,7 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SOperatorNode
*
nd
=
(
SOperatorNode
*
)
node
;
SOperatorNode
*
nd
=
(
SOperatorNode
*
)
node
;
assert
(
nodeType
(
node
)
==
QUERY_NODE_OPERATOR
);
assert
(
nodeType
(
node
)
==
QUERY_NODE_OPERATOR
);
SColumnNode
*
l
=
(
SColumnNode
*
)
nd
->
pLeft
;
SColumnNode
*
l
=
(
SColumnNode
*
)
nd
->
pLeft
;
SValueNode
*
r
=
(
SValueNode
*
)
nd
->
pRight
;
SValueNode
*
r
=
(
SValueNode
*
)
nd
->
pRight
;
param
->
colId
=
l
->
colId
;
param
->
colId
=
l
->
colId
;
param
->
colValType
=
l
->
node
.
resType
.
type
;
param
->
colValType
=
l
->
node
.
resType
.
type
;
...
@@ -357,7 +357,7 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
...
@@ -357,7 +357,7 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
static
int32_t
sifDoIndex
(
SIFParam
*
left
,
SIFParam
*
right
,
int8_t
operType
,
SIFParam
*
output
)
{
static
int32_t
sifDoIndex
(
SIFParam
*
left
,
SIFParam
*
right
,
int8_t
operType
,
SIFParam
*
output
)
{
int
ret
=
0
;
int
ret
=
0
;
SIndexMetaArg
*
arg
=
&
output
->
arg
;
SIndexMetaArg
*
arg
=
&
output
->
arg
;
EIndexQueryType
qtype
=
0
;
EIndexQueryType
qtype
=
0
;
SIF_ERR_RET
(
sifGetFuncFromSql
(
operType
,
&
qtype
));
SIF_ERR_RET
(
sifGetFuncFromSql
(
operType
,
&
qtype
));
if
(
left
->
colValType
==
TSDB_DATA_TYPE_JSON
)
{
if
(
left
->
colValType
==
TSDB_DATA_TYPE_JSON
)
{
...
@@ -749,7 +749,7 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
...
@@ -749,7 +749,7 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
SFilterInfo
*
filter
=
NULL
;
SFilterInfo
*
filter
=
NULL
;
SArray
*
output
=
taosArrayInit
(
8
,
sizeof
(
uint64_t
));
SArray
*
output
=
taosArrayInit
(
8
,
sizeof
(
uint64_t
));
SIFParam
param
=
{.
arg
=
*
metaArg
,
.
result
=
output
};
SIFParam
param
=
{.
arg
=
*
metaArg
,
.
result
=
output
};
SIF_ERR_RET
(
sifCalculate
((
SNode
*
)
pFilterNode
,
&
param
));
SIF_ERR_RET
(
sifCalculate
((
SNode
*
)
pFilterNode
,
&
param
));
...
...
source/libs/index/src/indexFst.c
浏览文件 @
843449f1
...
@@ -772,6 +772,7 @@ void fstBuilderDestroy(FstBuilder* b) {
...
@@ -772,6 +772,7 @@ void fstBuilderDestroy(FstBuilder* b) {
if
(
b
==
NULL
)
{
if
(
b
==
NULL
)
{
return
;
return
;
}
}
fstBuilderFinish
(
b
);
idxFileDestroy
(
b
->
wrt
);
idxFileDestroy
(
b
->
wrt
);
fstUnFinishedNodesDestroy
(
b
->
unfinished
);
fstUnFinishedNodesDestroy
(
b
->
unfinished
);
...
@@ -1074,8 +1075,8 @@ FStmStBuilder* fstSearchWithState(Fst* fst, FAutoCtx* ctx) {
...
@@ -1074,8 +1075,8 @@ FStmStBuilder* fstSearchWithState(Fst* fst, FAutoCtx* ctx) {
}
}
FstNode
*
fstGetRoot
(
Fst
*
fst
)
{
FstNode
*
fstGetRoot
(
Fst
*
fst
)
{
CompiledAddr
rA
ddr
=
fstGetRootAddr
(
fst
);
CompiledAddr
a
ddr
=
fstGetRootAddr
(
fst
);
return
fstGetNode
(
fst
,
rA
ddr
);
return
fstGetNode
(
fst
,
a
ddr
);
}
}
FstNode
*
fstGetNode
(
Fst
*
fst
,
CompiledAddr
addr
)
{
FstNode
*
fstGetNode
(
Fst
*
fst
,
CompiledAddr
addr
)
{
...
...
source/libs/index/src/indexFstFile.c
浏览文件 @
843449f1
...
@@ -4,8 +4,7 @@
...
@@ -4,8 +4,7 @@
* This program is free software: you can use, redistribute, and/or modify
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* or later ("AGPL"), as published by the Free Software Foundation.
*
* * This program is distributed in the hope that it will be useful, but WITHOUT
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
* FITNESS FOR A PARTICULAR PURPOSE.
*
*
...
@@ -14,13 +13,32 @@
...
@@ -14,13 +13,32 @@
*/
*/
#include "indexFstFile.h"
#include "indexFstFile.h"
#include "indexComm.h"
#include "indexFstUtil.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexInt.h"
#include "indexUtil.h"
#include "os.h"
#include "os.h"
#include "tutil.h"
#include "tutil.h"
static
int32_t
kBlockSize
=
4096
;
typedef
struct
{
int32_t
blockId
;
int32_t
nread
;
char
buf
[
0
];
}
SDataBlock
;
static
void
deleteDataBlockFromLRU
(
const
void
*
key
,
size_t
keyLen
,
void
*
value
)
{
taosMemoryFree
(
value
);
}
static
void
idxGenLRUKey
(
char
*
buf
,
const
char
*
path
,
int32_t
blockId
)
{
char
*
p
=
buf
;
SERIALIZE_STR_VAR_TO_BUF
(
p
,
path
,
strlen
(
path
));
SERIALIZE_VAR_TO_BUF
(
p
,
'_'
,
char
);
idxInt2str
(
blockId
,
p
,
0
);
return
;
}
static
int
idxFileCtxDoWrite
(
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
)
{
static
int
idxFileCtxDoWrite
(
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
)
{
if
(
ctx
->
type
==
TF
ile
)
{
if
(
ctx
->
type
==
TF
ILE
)
{
assert
(
len
==
taosWriteFile
(
ctx
->
file
.
pFile
,
buf
,
len
));
assert
(
len
==
taosWriteFile
(
ctx
->
file
.
pFile
,
buf
,
len
));
}
else
{
}
else
{
memcpy
(
ctx
->
mem
.
buf
+
ctx
->
offset
,
buf
,
len
);
memcpy
(
ctx
->
mem
.
buf
+
ctx
->
offset
,
buf
,
len
);
...
@@ -30,7 +48,7 @@ static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
...
@@ -30,7 +48,7 @@ static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
}
}
static
int
idxFileCtxDoRead
(
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
)
{
static
int
idxFileCtxDoRead
(
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
)
{
int
nRead
=
0
;
int
nRead
=
0
;
if
(
ctx
->
type
==
TF
ile
)
{
if
(
ctx
->
type
==
TF
ILE
)
{
#ifdef USE_MMAP
#ifdef USE_MMAP
nRead
=
len
<
ctx
->
file
.
size
?
len
:
ctx
->
file
.
size
;
nRead
=
len
<
ctx
->
file
.
size
?
len
:
ctx
->
file
.
size
;
memcpy
(
buf
,
ctx
->
file
.
ptr
,
nRead
);
memcpy
(
buf
,
ctx
->
file
.
ptr
,
nRead
);
...
@@ -45,24 +63,54 @@ static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
...
@@ -45,24 +63,54 @@ static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
return
nRead
;
return
nRead
;
}
}
static
int
idxFileCtxDoReadFrom
(
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
,
int32_t
offset
)
{
static
int
idxFileCtxDoReadFrom
(
IFileCtx
*
ctx
,
uint8_t
*
buf
,
int
len
,
int32_t
offset
)
{
int
nRead
=
0
;
int32_t
total
=
0
,
nread
=
0
;
if
(
ctx
->
type
==
TFile
)
{
int32_t
blkId
=
offset
/
kBlockSize
;
// tfLseek(ctx->file.pFile, offset, 0);
int32_t
blkOffset
=
offset
%
kBlockSize
;
#ifdef USE_MMAP
int32_t
blkLeft
=
kBlockSize
-
blkOffset
;
int32_t
last
=
ctx
->
file
.
size
-
offset
;
nRead
=
last
>=
len
?
len
:
last
;
do
{
memcpy
(
buf
,
ctx
->
file
.
ptr
+
offset
,
nRead
);
char
key
[
128
]
=
{
0
};
#else
idxGenLRUKey
(
key
,
ctx
->
file
.
buf
,
blkId
);
nRead
=
taosPReadFile
(
ctx
->
file
.
pFile
,
buf
,
len
,
offset
);
LRUHandle
*
h
=
taosLRUCacheLookup
(
ctx
->
lru
,
key
,
strlen
(
key
));
#endif
}
else
{
if
(
h
)
{
// refactor later
SDataBlock
*
blk
=
taosLRUCacheValue
(
ctx
->
lru
,
h
);
assert
(
0
);
nread
=
TMIN
(
blkLeft
,
len
);
}
memcpy
(
buf
+
total
,
blk
->
buf
+
blkOffset
,
nread
);
return
nRead
;
taosLRUCacheRelease
(
ctx
->
lru
,
h
,
false
);
}
else
{
int32_t
cacheMemSize
=
sizeof
(
SDataBlock
)
+
kBlockSize
;
SDataBlock
*
blk
=
taosMemoryCalloc
(
1
,
cacheMemSize
);
blk
->
blockId
=
blkId
;
blk
->
nread
=
taosPReadFile
(
ctx
->
file
.
pFile
,
blk
->
buf
,
kBlockSize
,
blkId
*
kBlockSize
);
assert
(
blk
->
nread
<=
kBlockSize
);
nread
=
TMIN
(
blkLeft
,
len
);
if
(
blk
->
nread
<
kBlockSize
&&
blk
->
nread
<
len
)
{
break
;
}
memcpy
(
buf
+
total
,
blk
->
buf
+
blkOffset
,
nread
);
LRUStatus
s
=
taosLRUCacheInsert
(
ctx
->
lru
,
key
,
strlen
(
key
),
blk
,
cacheMemSize
,
deleteDataBlockFromLRU
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
if
(
s
!=
TAOS_LRU_STATUS_OK
)
{
return
-
1
;
}
}
total
+=
nread
;
len
-=
nread
;
offset
+=
nread
;
blkId
=
offset
/
kBlockSize
;
blkOffset
=
offset
%
kBlockSize
;
blkLeft
=
kBlockSize
-
blkOffset
;
}
while
(
len
>
0
);
return
total
;
}
}
static
int
idxFileCtxGetSize
(
IFileCtx
*
ctx
)
{
static
int
idxFileCtxGetSize
(
IFileCtx
*
ctx
)
{
if
(
ctx
->
type
==
TF
ile
)
{
if
(
ctx
->
type
==
TF
ILE
)
{
int64_t
file_size
=
0
;
int64_t
file_size
=
0
;
taosStatFile
(
ctx
->
file
.
buf
,
&
file_size
,
NULL
);
taosStatFile
(
ctx
->
file
.
buf
,
&
file_size
,
NULL
);
return
(
int
)
file_size
;
return
(
int
)
file_size
;
...
@@ -70,7 +118,7 @@ static int idxFileCtxGetSize(IFileCtx* ctx) {
...
@@ -70,7 +118,7 @@ static int idxFileCtxGetSize(IFileCtx* ctx) {
return
0
;
return
0
;
}
}
static
int
idxFileCtxDoFlush
(
IFileCtx
*
ctx
)
{
static
int
idxFileCtxDoFlush
(
IFileCtx
*
ctx
)
{
if
(
ctx
->
type
==
TF
ile
)
{
if
(
ctx
->
type
==
TF
ILE
)
{
taosFsyncFile
(
ctx
->
file
.
pFile
);
taosFsyncFile
(
ctx
->
file
.
pFile
);
}
else
{
}
else
{
// do nothing
// do nothing
...
@@ -85,7 +133,7 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
...
@@ -85,7 +133,7 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
}
}
ctx
->
type
=
type
;
ctx
->
type
=
type
;
if
(
ctx
->
type
==
TF
ile
)
{
if
(
ctx
->
type
==
TF
ILE
)
{
// ugly code, refactor later
// ugly code, refactor later
ctx
->
file
.
readOnly
=
readOnly
;
ctx
->
file
.
readOnly
=
readOnly
;
memcpy
(
ctx
->
file
.
buf
,
path
,
strlen
(
path
));
memcpy
(
ctx
->
file
.
buf
,
path
,
strlen
(
path
));
...
@@ -93,8 +141,6 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
...
@@ -93,8 +141,6 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
ctx
->
file
.
pFile
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
);
ctx
->
file
.
pFile
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
);
taosFtruncateFile
(
ctx
->
file
.
pFile
,
0
);
taosFtruncateFile
(
ctx
->
file
.
pFile
,
0
);
taosStatFile
(
path
,
&
ctx
->
file
.
size
,
NULL
);
taosStatFile
(
path
,
&
ctx
->
file
.
size
,
NULL
);
// ctx->file.size = (int)size;
}
else
{
}
else
{
ctx
->
file
.
pFile
=
taosOpenFile
(
path
,
TD_FILE_READ
);
ctx
->
file
.
pFile
=
taosOpenFile
(
path
,
TD_FILE_READ
);
...
@@ -109,10 +155,11 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
...
@@ -109,10 +155,11 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
indexError
(
"failed to open file, error %d"
,
errno
);
indexError
(
"failed to open file, error %d"
,
errno
);
goto
END
;
goto
END
;
}
}
}
else
if
(
ctx
->
type
==
TM
emory
)
{
}
else
if
(
ctx
->
type
==
TM
EMORY
)
{
ctx
->
mem
.
buf
=
taosMemoryCalloc
(
1
,
sizeof
(
char
)
*
capacity
);
ctx
->
mem
.
buf
=
taosMemoryCalloc
(
1
,
sizeof
(
char
)
*
capacity
);
ctx
->
mem
.
cap
=
capacity
;
ctx
->
mem
.
cap
=
capacity
;
}
}
ctx
->
write
=
idxFileCtxDoWrite
;
ctx
->
write
=
idxFileCtxDoWrite
;
ctx
->
read
=
idxFileCtxDoRead
;
ctx
->
read
=
idxFileCtxDoRead
;
ctx
->
flush
=
idxFileCtxDoFlush
;
ctx
->
flush
=
idxFileCtxDoFlush
;
...
@@ -124,14 +171,14 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
...
@@ -124,14 +171,14 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
return
ctx
;
return
ctx
;
END:
END:
if
(
ctx
->
type
==
TM
emory
)
{
if
(
ctx
->
type
==
TM
EMORY
)
{
taosMemoryFree
(
ctx
->
mem
.
buf
);
taosMemoryFree
(
ctx
->
mem
.
buf
);
}
}
taosMemoryFree
(
ctx
);
taosMemoryFree
(
ctx
);
return
NULL
;
return
NULL
;
}
}
void
idxFileCtxDestroy
(
IFileCtx
*
ctx
,
bool
remove
)
{
void
idxFileCtxDestroy
(
IFileCtx
*
ctx
,
bool
remove
)
{
if
(
ctx
->
type
==
TM
emory
)
{
if
(
ctx
->
type
==
TM
EMORY
)
{
taosMemoryFree
(
ctx
->
mem
.
buf
);
taosMemoryFree
(
ctx
->
mem
.
buf
);
}
else
{
}
else
{
ctx
->
flush
(
ctx
);
ctx
->
flush
(
ctx
);
...
@@ -183,6 +230,7 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
...
@@ -183,6 +230,7 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
write
->
summer
=
taosCalcChecksum
(
write
->
summer
,
buf
,
len
);
write
->
summer
=
taosCalcChecksum
(
write
->
summer
,
buf
,
len
);
return
len
;
return
len
;
}
}
int
idxFileRead
(
IdxFstFile
*
write
,
uint8_t
*
buf
,
uint32_t
len
)
{
int
idxFileRead
(
IdxFstFile
*
write
,
uint8_t
*
buf
,
uint32_t
len
)
{
if
(
write
==
NULL
)
{
if
(
write
==
NULL
)
{
return
0
;
return
0
;
...
...
source/libs/index/src/indexTfile.c
浏览文件 @
843449f1
...
@@ -90,7 +90,7 @@ static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt
...
@@ -90,7 +90,7 @@ static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt
{
tfSearchEqual_JSON
,
tfSearchPrefix_JSON
,
tfSearchSuffix_JSON
,
tfSearchRegex_JSON
,
tfSearchLessThan_JSON
,
{
tfSearchEqual_JSON
,
tfSearchPrefix_JSON
,
tfSearchSuffix_JSON
,
tfSearchRegex_JSON
,
tfSearchLessThan_JSON
,
tfSearchLessEqual_JSON
,
tfSearchGreaterThan_JSON
,
tfSearchGreaterEqual_JSON
,
tfSearchRange_JSON
}};
tfSearchLessEqual_JSON
,
tfSearchGreaterThan_JSON
,
tfSearchGreaterEqual_JSON
,
tfSearchRange_JSON
}};
TFileCache
*
tfileCacheCreate
(
const
char
*
path
)
{
TFileCache
*
tfileCacheCreate
(
SIndex
*
idx
,
const
char
*
path
)
{
TFileCache
*
tcache
=
taosMemoryCalloc
(
1
,
sizeof
(
TFileCache
));
TFileCache
*
tcache
=
taosMemoryCalloc
(
1
,
sizeof
(
TFileCache
));
if
(
tcache
==
NULL
)
{
if
(
tcache
==
NULL
)
{
return
NULL
;
return
NULL
;
...
@@ -103,17 +103,20 @@ TFileCache* tfileCacheCreate(const char* path) {
...
@@ -103,17 +103,20 @@ TFileCache* tfileCacheCreate(const char* path) {
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
files
);
i
++
)
{
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
files
);
i
++
)
{
char
*
file
=
taosArrayGetP
(
files
,
i
);
char
*
file
=
taosArrayGetP
(
files
,
i
);
IFileCtx
*
wc
=
idxFileCtxCreate
(
TFile
,
file
,
true
,
1024
*
1024
*
64
);
IFileCtx
*
ctx
=
idxFileCtxCreate
(
TFILE
,
file
,
true
,
1024
*
1024
*
64
);
if
(
wc
==
NULL
)
{
if
(
ctx
==
NULL
)
{
indexError
(
"failed to open index:%s"
,
file
);
indexError
(
"failed to open index:%s"
,
file
);
goto
End
;
goto
End
;
}
}
ctx
->
lru
=
idx
->
lru
;
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
TFileReader
*
reader
=
tfileReaderCreate
(
ctx
);
if
(
reader
==
NULL
)
{
if
(
reader
==
NULL
)
{
indexInfo
(
"skip invalid file: %s"
,
file
);
indexInfo
(
"skip invalid file: %s"
,
file
);
continue
;
continue
;
}
}
reader
->
lru
=
idx
->
lru
;
TFileHeader
*
header
=
&
reader
->
header
;
TFileHeader
*
header
=
&
reader
->
header
;
ICacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
(
int32_t
)
strlen
(
header
->
colName
)};
ICacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
(
int32_t
)
strlen
(
header
->
colName
)};
...
@@ -160,9 +163,8 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
...
@@ -160,9 +163,8 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
return
*
reader
;
return
*
reader
;
}
}
void
tfileCachePut
(
TFileCache
*
tcache
,
ICacheKey
*
key
,
TFileReader
*
reader
)
{
void
tfileCachePut
(
TFileCache
*
tcache
,
ICacheKey
*
key
,
TFileReader
*
reader
)
{
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
idxSerialCacheKey
(
key
,
buf
);
int32_t
sz
=
idxSerialCacheKey
(
key
,
buf
);
// remove last version index reader
TFileReader
**
p
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
sz
);
TFileReader
**
p
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
sz
);
if
(
p
!=
NULL
&&
*
p
!=
NULL
)
{
if
(
p
!=
NULL
&&
*
p
!=
NULL
)
{
TFileReader
*
oldRdr
=
*
p
;
TFileReader
*
oldRdr
=
*
p
;
...
@@ -493,7 +495,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
...
@@ -493,7 +495,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
char
fullname
[
256
]
=
{
0
};
char
fullname
[
256
]
=
{
0
};
tfileGenFileFullName
(
fullname
,
path
,
suid
,
colName
,
version
);
tfileGenFileFullName
(
fullname
,
path
,
suid
,
colName
,
version
);
// indexInfo("open write file name %s", fullname);
// indexInfo("open write file name %s", fullname);
IFileCtx
*
wcx
=
idxFileCtxCreate
(
TF
ile
,
fullname
,
false
,
1024
*
1024
*
64
);
IFileCtx
*
wcx
=
idxFileCtxCreate
(
TF
ILE
,
fullname
,
false
,
1024
*
1024
*
64
);
if
(
wcx
==
NULL
)
{
if
(
wcx
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
...
@@ -506,16 +508,17 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
...
@@ -506,16 +508,17 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
return
tfileWriterCreate
(
wcx
,
&
tfh
);
return
tfileWriterCreate
(
wcx
,
&
tfh
);
}
}
TFileReader
*
tfileReaderOpen
(
char
*
path
,
uint64_t
suid
,
int64_t
version
,
const
char
*
colName
)
{
TFileReader
*
tfileReaderOpen
(
SIndex
*
idx
,
uint64_t
suid
,
int64_t
version
,
const
char
*
colName
)
{
char
fullname
[
256
]
=
{
0
};
char
fullname
[
256
]
=
{
0
};
tfileGenFileFullName
(
fullname
,
path
,
suid
,
colName
,
version
);
tfileGenFileFullName
(
fullname
,
idx
->
path
,
suid
,
colName
,
version
);
IFileCtx
*
wc
=
idxFileCtxCreate
(
TF
ile
,
fullname
,
true
,
1024
*
1024
*
1024
);
IFileCtx
*
wc
=
idxFileCtxCreate
(
TF
ILE
,
fullname
,
true
,
1024
*
1024
*
1024
);
if
(
wc
==
NULL
)
{
if
(
wc
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
indexError
(
"failed to open readonly file: %s, reason: %s"
,
fullname
,
terrstr
());
indexError
(
"failed to open readonly file: %s, reason: %s"
,
fullname
,
terrstr
());
return
NULL
;
return
NULL
;
}
}
wc
->
lru
=
idx
->
lru
;
indexTrace
(
"open read file name:%s, file size: %"
PRId64
""
,
wc
->
file
.
buf
,
wc
->
file
.
size
);
indexTrace
(
"open read file name:%s, file size: %"
PRId64
""
,
wc
->
file
.
buf
,
wc
->
file
.
size
);
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
...
@@ -598,17 +601,11 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
...
@@ -598,17 +601,11 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
indexError
(
"failed to write data: %s, offset: %d len: %d"
,
v
->
colVal
,
v
->
offset
,
indexError
(
"failed to write data: %s, offset: %d len: %d"
,
v
->
colVal
,
v
->
offset
,
(
int
)
taosArrayGetSize
(
v
->
tableId
));
(
int
)
taosArrayGetSize
(
v
->
tableId
));
}
else
{
}
else
{
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
indexInfo
(
"success to write data: %s, offset: %d len: %d"
,
v
->
colVal
,
v
->
offset
,
// (int)taosArrayGetSize(v->tableId));
(
int
)
taosArrayGetSize
(
v
->
tableId
));
// indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
}
}
}
}
fstBuilderFinish
(
tw
->
fb
);
fstBuilderDestroy
(
tw
->
fb
);
fstBuilderDestroy
(
tw
->
fb
);
tw
->
fb
=
NULL
;
tfileWriteFooter
(
tw
);
tfileWriteFooter
(
tw
);
return
0
;
return
0
;
}
}
...
@@ -627,8 +624,8 @@ void tfileWriterDestroy(TFileWriter* tw) {
...
@@ -627,8 +624,8 @@ void tfileWriterDestroy(TFileWriter* tw) {
taosMemoryFree
(
tw
);
taosMemoryFree
(
tw
);
}
}
IndexTFile
*
idxTFileCreate
(
const
char
*
path
)
{
IndexTFile
*
idxTFileCreate
(
SIndex
*
idx
,
const
char
*
path
)
{
TFileCache
*
cache
=
tfileCacheCreate
(
path
);
TFileCache
*
cache
=
tfileCacheCreate
(
idx
,
path
);
if
(
cache
==
NULL
)
{
if
(
cache
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
...
@@ -859,18 +856,6 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
...
@@ -859,18 +856,6 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
return
0
;
return
0
;
}
}
return
-
1
;
return
-
1
;
// if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
// FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
// if (fstBuilderInsert(write->fb, key, tval->offset)) {
// fstSliceDestroy(&key);
// return 0;
// }
// fstSliceDestroy(&key);
// return -1;
//} else {
// // handle other type later
//}
}
}
static
int
tfileWriteFooter
(
TFileWriter
*
write
)
{
static
int
tfileWriteFooter
(
TFileWriter
*
write
)
{
char
buf
[
sizeof
(
FILE_MAGIC_NUMBER
)
+
1
]
=
{
0
};
char
buf
[
sizeof
(
FILE_MAGIC_NUMBER
)
+
1
]
=
{
0
};
...
@@ -887,6 +872,7 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
...
@@ -887,6 +872,7 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
char
buf
[
TFILE_HEADER_SIZE
]
=
{
0
};
char
buf
[
TFILE_HEADER_SIZE
]
=
{
0
};
int64_t
nread
=
reader
->
ctx
->
readFrom
(
reader
->
ctx
,
buf
,
sizeof
(
buf
),
0
);
int64_t
nread
=
reader
->
ctx
->
readFrom
(
reader
->
ctx
,
buf
,
sizeof
(
buf
),
0
);
if
(
nread
==
-
1
)
{
if
(
nread
==
-
1
)
{
indexError
(
"actual Read: %d, to read: %d, errno: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
errno
,
indexError
(
"actual Read: %d, to read: %d, errno: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
errno
,
reader
->
ctx
->
file
.
buf
);
reader
->
ctx
->
file
.
buf
);
...
@@ -914,7 +900,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
...
@@ -914,7 +900,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
int64_t
cost
=
taosGetTimestampUs
()
-
ts
;
int64_t
cost
=
taosGetTimestampUs
()
-
ts
;
indexInfo
(
"nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %"
PRId64
", time cost: %"
PRId64
indexInfo
(
"nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %"
PRId64
", time cost: %"
PRId64
"us"
,
"us"
,
nread
,
reader
->
header
.
fstOffset
,
fstSize
,
ctx
->
file
.
buf
,
ctx
->
file
.
size
,
cost
);
nread
,
reader
->
header
.
fstOffset
,
fstSize
,
ctx
->
file
.
buf
,
size
,
cost
);
// we assuse fst size less than FST_MAX_SIZE
// we assuse fst size less than FST_MAX_SIZE
assert
(
nread
>
0
&&
nread
<=
fstSize
);
assert
(
nread
>
0
&&
nread
<=
fstSize
);
...
...
source/libs/index/test/fstTest.cc
浏览文件 @
843449f1
...
@@ -19,7 +19,7 @@ class FstWriter {
...
@@ -19,7 +19,7 @@ class FstWriter {
public:
public:
FstWriter
()
{
FstWriter
()
{
taosRemoveFile
(
fileName
.
c_str
());
taosRemoveFile
(
fileName
.
c_str
());
_wc
=
idxFileCtxCreate
(
TF
ile
,
fileName
.
c_str
(),
false
,
64
*
1024
*
1024
);
_wc
=
idxFileCtxCreate
(
TF
ILE
,
fileName
.
c_str
(),
false
,
64
*
1024
*
1024
);
_b
=
fstBuilderCreate
(
_wc
,
0
);
_b
=
fstBuilderCreate
(
_wc
,
0
);
}
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
...
@@ -34,7 +34,7 @@ class FstWriter {
...
@@ -34,7 +34,7 @@ class FstWriter {
return
ok
;
return
ok
;
}
}
~
FstWriter
()
{
~
FstWriter
()
{
fstBuilderFinish
(
_b
);
//
fstBuilderFinish(_b);
fstBuilderDestroy
(
_b
);
fstBuilderDestroy
(
_b
);
idxFileCtxDestroy
(
_wc
,
false
);
idxFileCtxDestroy
(
_wc
,
false
);
...
@@ -48,7 +48,7 @@ class FstWriter {
...
@@ -48,7 +48,7 @@ class FstWriter {
class
FstReadMemory
{
class
FstReadMemory
{
public:
public:
FstReadMemory
(
int32_t
size
,
const
std
::
string
&
fileName
=
TD_TMP_DIR_PATH
"tindex.tindex"
)
{
FstReadMemory
(
int32_t
size
,
const
std
::
string
&
fileName
=
TD_TMP_DIR_PATH
"tindex.tindex"
)
{
_wc
=
idxFileCtxCreate
(
TF
ile
,
fileName
.
c_str
(),
true
,
64
*
1024
);
_wc
=
idxFileCtxCreate
(
TF
ILE
,
fileName
.
c_str
(),
true
,
64
*
1024
);
_w
=
idxFileCreate
(
_wc
);
_w
=
idxFileCreate
(
_wc
);
_size
=
size
;
_size
=
size
;
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
...
@@ -598,7 +598,9 @@ void fst_get(Fst* fst) {
...
@@ -598,7 +598,9 @@ void fst_get(Fst* fst) {
void
validateTFile
(
char
*
arg
)
{
void
validateTFile
(
char
*
arg
)
{
std
::
thread
threads
[
NUM_OF_THREAD
];
std
::
thread
threads
[
NUM_OF_THREAD
];
// std::vector<std::thread> threads;
// std::vector<std::thread> threads;
TFileReader
*
reader
=
tfileReaderOpen
(
arg
,
0
,
20000000
,
"tag1"
);
SIndex
*
index
=
(
SIndex
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SIndex
));
index
->
path
=
strdup
(
arg
);
TFileReader
*
reader
=
tfileReaderOpen
(
index
,
0
,
20000000
,
"tag1"
);
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
threads
[
i
]
=
std
::
thread
(
fst_get
,
reader
->
fst
);
threads
[
i
]
=
std
::
thread
(
fst_get
,
reader
->
fst
);
...
@@ -617,7 +619,7 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
...
@@ -617,7 +619,7 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
uint64_t
suid
=
atoi
(
uid
);
uint64_t
suid
=
atoi
(
uid
);
int
version
=
atoi
(
ver
);
int
version
=
atoi
(
ver
);
TFileReader
*
reader
=
tfileReaderOpen
(
path
,
suid
,
version
,
colName
);
TFileReader
*
reader
=
tfileReaderOpen
(
NULL
,
suid
,
version
,
colName
);
Iterate
*
iter
=
tfileIteratorCreate
(
reader
);
Iterate
*
iter
=
tfileIteratorCreate
(
reader
);
bool
tn
=
iter
?
iter
->
next
(
iter
)
:
false
;
bool
tn
=
iter
?
iter
->
next
(
iter
)
:
false
;
...
...
source/libs/index/test/fstUT.cc
浏览文件 @
843449f1
...
@@ -39,7 +39,7 @@ static void EnvCleanup() {}
...
@@ -39,7 +39,7 @@ static void EnvCleanup() {}
class
FstWriter
{
class
FstWriter
{
public:
public:
FstWriter
()
{
FstWriter
()
{
_wc
=
idxFileCtxCreate
(
TF
ile
,
tindex
,
false
,
64
*
1024
*
1024
);
_wc
=
idxFileCtxCreate
(
TF
ILE
,
tindex
,
false
,
64
*
1024
*
1024
);
_b
=
fstBuilderCreate
(
_wc
,
0
);
_b
=
fstBuilderCreate
(
_wc
,
0
);
}
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
...
@@ -54,7 +54,6 @@ class FstWriter {
...
@@ -54,7 +54,6 @@ class FstWriter {
return
ok
;
return
ok
;
}
}
~
FstWriter
()
{
~
FstWriter
()
{
fstBuilderFinish
(
_b
);
fstBuilderDestroy
(
_b
);
fstBuilderDestroy
(
_b
);
idxFileCtxDestroy
(
_wc
,
false
);
idxFileCtxDestroy
(
_wc
,
false
);
...
@@ -68,7 +67,7 @@ class FstWriter {
...
@@ -68,7 +67,7 @@ class FstWriter {
class
FstReadMemory
{
class
FstReadMemory
{
public:
public:
FstReadMemory
(
size_t
size
)
{
FstReadMemory
(
size_t
size
)
{
_wc
=
idxFileCtxCreate
(
TF
ile
,
tindex
,
true
,
64
*
1024
);
_wc
=
idxFileCtxCreate
(
TF
ILE
,
tindex
,
true
,
64
*
1024
);
_w
=
idxFileCreate
(
_wc
);
_w
=
idxFileCreate
(
_wc
);
_size
=
size
;
_size
=
size
;
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
...
...
source/libs/index/test/indexTests.cc
浏览文件 @
843449f1
...
@@ -50,7 +50,7 @@ class DebugInfo {
...
@@ -50,7 +50,7 @@ class DebugInfo {
class
FstWriter
{
class
FstWriter
{
public:
public:
FstWriter
()
{
FstWriter
()
{
_wc
=
idxFileCtxCreate
(
TF
ile
,
TD_TMP_DIR_PATH
"tindex"
,
false
,
64
*
1024
*
1024
);
_wc
=
idxFileCtxCreate
(
TF
ILE
,
TD_TMP_DIR_PATH
"tindex"
,
false
,
64
*
1024
*
1024
);
_b
=
fstBuilderCreate
(
NULL
,
0
);
_b
=
fstBuilderCreate
(
NULL
,
0
);
}
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
...
@@ -60,7 +60,7 @@ class FstWriter {
...
@@ -60,7 +60,7 @@ class FstWriter {
return
ok
;
return
ok
;
}
}
~
FstWriter
()
{
~
FstWriter
()
{
fstBuilderFinish
(
_b
);
//
fstBuilderFinish(_b);
fstBuilderDestroy
(
_b
);
fstBuilderDestroy
(
_b
);
idxFileCtxDestroy
(
_wc
,
false
);
idxFileCtxDestroy
(
_wc
,
false
);
...
@@ -74,7 +74,7 @@ class FstWriter {
...
@@ -74,7 +74,7 @@ class FstWriter {
class
FstReadMemory
{
class
FstReadMemory
{
public:
public:
FstReadMemory
(
size_t
size
)
{
FstReadMemory
(
size_t
size
)
{
_wc
=
idxFileCtxCreate
(
TF
ile
,
TD_TMP_DIR_PATH
"tindex"
,
true
,
64
*
1024
);
_wc
=
idxFileCtxCreate
(
TF
ILE
,
TD_TMP_DIR_PATH
"tindex"
,
true
,
64
*
1024
);
_w
=
idxFileCreate
(
_wc
);
_w
=
idxFileCreate
(
_wc
);
_size
=
size
;
_size
=
size
;
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
...
@@ -292,14 +292,12 @@ class IndexEnv : public ::testing::Test {
...
@@ -292,14 +292,12 @@ class IndexEnv : public ::testing::Test {
virtual
void
SetUp
()
{
virtual
void
SetUp
()
{
initLog
();
initLog
();
taosRemoveDir
(
path
);
taosRemoveDir
(
path
);
opts
=
indexOptsCreate
();
SIndexOpts
opts
;
int
ret
=
indexOpen
(
opts
,
path
,
&
index
);
opts
.
cacheSize
=
1024
*
1024
*
4
;
int
ret
=
indexOpen
(
&
opts
,
path
,
&
index
);
assert
(
ret
==
0
);
assert
(
ret
==
0
);
}
}
virtual
void
TearDown
()
{
virtual
void
TearDown
()
{
indexClose
(
index
);
}
indexClose
(
index
);
indexOptsDestroy
(
opts
);
}
const
char
*
path
=
TD_TMP_DIR_PATH
"tindex"
;
const
char
*
path
=
TD_TMP_DIR_PATH
"tindex"
;
SIndexOpts
*
opts
;
SIndexOpts
*
opts
;
...
@@ -391,13 +389,15 @@ class TFileObj {
...
@@ -391,13 +389,15 @@ class TFileObj {
fileName_
=
path
;
fileName_
=
path
;
IFileCtx
*
ctx
=
idxFileCtxCreate
(
TFile
,
path
.
c_str
(),
false
,
64
*
1024
*
1024
);
IFileCtx
*
ctx
=
idxFileCtxCreate
(
TFILE
,
path
.
c_str
(),
false
,
64
*
1024
*
1024
);
ctx
->
lru
=
taosLRUCacheInit
(
1024
*
1024
*
4
,
-
1
,
.5
);
writer_
=
tfileWriterCreate
(
ctx
,
&
header
);
writer_
=
tfileWriterCreate
(
ctx
,
&
header
);
return
writer_
!=
NULL
?
true
:
false
;
return
writer_
!=
NULL
?
true
:
false
;
}
}
bool
InitReader
()
{
bool
InitReader
()
{
IFileCtx
*
ctx
=
idxFileCtxCreate
(
TFile
,
fileName_
.
c_str
(),
true
,
64
*
1024
*
1024
);
IFileCtx
*
ctx
=
idxFileCtxCreate
(
TFILE
,
fileName_
.
c_str
(),
true
,
64
*
1024
*
1024
);
ctx
->
lru
=
taosLRUCacheInit
(
1024
*
1024
*
4
,
-
1
,
.5
);
reader_
=
tfileReaderCreate
(
ctx
);
reader_
=
tfileReaderCreate
(
ctx
);
return
reader_
!=
NULL
?
true
:
false
;
return
reader_
!=
NULL
?
true
:
false
;
}
}
...
@@ -657,7 +657,7 @@ TEST_F(IndexCacheEnv, cache_test) {
...
@@ -657,7 +657,7 @@ TEST_F(IndexCacheEnv, cache_test) {
{
{
std
::
string
colVal
(
"v3"
);
std
::
string
colVal
(
"v3"
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexTermQuery
query
=
{
term
,
QUERY_TERM
};
SIndexTermQuery
query
=
{
term
,
QUERY_TERM
};
SArray
*
ret
=
(
SArray
*
)
taosArrayInit
(
4
,
sizeof
(
suid
));
SArray
*
ret
=
(
SArray
*
)
taosArrayInit
(
4
,
sizeof
(
suid
));
STermValueType
valType
;
STermValueType
valType
;
...
@@ -672,7 +672,7 @@ TEST_F(IndexCacheEnv, cache_test) {
...
@@ -672,7 +672,7 @@ TEST_F(IndexCacheEnv, cache_test) {
{
{
std
::
string
colVal
(
"v2"
);
std
::
string
colVal
(
"v2"
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexTermQuery
query
=
{
term
,
QUERY_TERM
};
SIndexTermQuery
query
=
{
term
,
QUERY_TERM
};
SArray
*
ret
=
(
SArray
*
)
taosArrayInit
(
4
,
sizeof
(
suid
));
SArray
*
ret
=
(
SArray
*
)
taosArrayInit
(
4
,
sizeof
(
suid
));
STermValueType
valType
;
STermValueType
valType
;
...
@@ -698,6 +698,9 @@ class IndexObj {
...
@@ -698,6 +698,9 @@ class IndexObj {
taosMkDir
(
dir
.
c_str
());
taosMkDir
(
dir
.
c_str
());
}
}
taosMkDir
(
dir
.
c_str
());
taosMkDir
(
dir
.
c_str
());
SIndexOpts
opts
;
opts
.
cacheSize
=
1024
*
1024
*
4
;
int
ret
=
indexOpen
(
&
opts
,
dir
.
c_str
(),
&
idx
);
int
ret
=
indexOpen
(
&
opts
,
dir
.
c_str
(),
&
idx
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
// opt
// opt
...
@@ -707,7 +710,7 @@ class IndexObj {
...
@@ -707,7 +710,7 @@ class IndexObj {
}
}
void
Del
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
,
uint64_t
uid
)
{
void
Del
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
,
uint64_t
uid
)
{
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
DEL_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
DEL_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
Put
(
terms
,
uid
);
Put
(
terms
,
uid
);
...
@@ -716,7 +719,7 @@ class IndexObj {
...
@@ -716,7 +719,7 @@ class IndexObj {
int
WriteMillonData
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
=
"Hello world"
,
int
WriteMillonData
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
=
"Hello world"
,
size_t
numOfTable
=
100
*
10000
)
{
size_t
numOfTable
=
100
*
10000
)
{
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
numOfTable
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
numOfTable
;
i
++
)
{
...
@@ -738,7 +741,7 @@ class IndexObj {
...
@@ -738,7 +741,7 @@ class IndexObj {
tColVal
[
taosRand
()
%
colValSize
]
=
'a'
+
k
%
26
;
tColVal
[
taosRand
()
%
colValSize
]
=
'a'
+
k
%
26
;
}
}
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
tColVal
.
c_str
(),
tColVal
.
size
());
tColVal
.
c_str
(),
tColVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
j
=
0
;
j
<
skip
;
j
++
)
{
for
(
size_t
j
=
0
;
j
<
skip
;
j
++
)
{
...
@@ -774,7 +777,7 @@ class IndexObj {
...
@@ -774,7 +777,7 @@ class IndexObj {
int
SearchOne
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
)
{
int
SearchOne
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
)
{
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermQueryAdd
(
mq
,
term
,
QUERY_TERM
);
indexMultiTermQueryAdd
(
mq
,
term
,
QUERY_TERM
);
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
...
@@ -796,7 +799,7 @@ class IndexObj {
...
@@ -796,7 +799,7 @@ class IndexObj {
int
SearchOneTarget
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
,
uint64_t
val
)
{
int
SearchOneTarget
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
,
uint64_t
val
)
{
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermQueryAdd
(
mq
,
term
,
QUERY_TERM
);
indexMultiTermQueryAdd
(
mq
,
term
,
QUERY_TERM
);
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
...
@@ -821,7 +824,7 @@ class IndexObj {
...
@@ -821,7 +824,7 @@ class IndexObj {
void
PutOne
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
)
{
void
PutOne
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
)
{
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
Put
(
terms
,
10
);
Put
(
terms
,
10
);
indexMultiTermDestroy
(
terms
);
indexMultiTermDestroy
(
terms
);
...
@@ -829,7 +832,7 @@ class IndexObj {
...
@@ -829,7 +832,7 @@ class IndexObj {
void
PutOneTarge
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
,
uint64_t
val
)
{
void
PutOneTarge
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
,
uint64_t
val
)
{
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
Put
(
terms
,
val
);
Put
(
terms
,
val
);
indexMultiTermDestroy
(
terms
);
indexMultiTermDestroy
(
terms
);
...
@@ -845,10 +848,10 @@ class IndexObj {
...
@@ -845,10 +848,10 @@ class IndexObj {
}
}
private:
private:
SIndexOpts
opts
;
SIndexOpts
*
opts
;
SIndex
*
idx
;
SIndex
*
idx
;
int
numOfWrite
;
int
numOfWrite
;
int
numOfRead
;
int
numOfRead
;
};
};
class
IndexEnv2
:
public
::
testing
::
Test
{
class
IndexEnv2
:
public
::
testing
::
Test
{
...
@@ -875,7 +878,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
...
@@ -875,7 +878,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
targetSize
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
targetSize
;
i
++
)
{
...
@@ -890,7 +893,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
...
@@ -890,7 +893,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
std
::
string
colName
(
"tag1"
),
colVal
(
"hello"
);
std
::
string
colName
(
"tag1"
),
colVal
(
"hello"
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
{
...
@@ -905,7 +908,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
...
@@ -905,7 +908,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
size
*
3
;
i
<
size
*
4
;
i
++
)
{
for
(
size_t
i
=
size
*
3
;
i
<
size
*
4
;
i
++
)
{
...
@@ -920,7 +923,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
...
@@ -920,7 +923,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermQueryAdd
(
mq
,
term
,
QUERY_TERM
);
indexMultiTermQueryAdd
(
mq
,
term
,
QUERY_TERM
);
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
...
@@ -943,7 +946,7 @@ TEST_F(IndexEnv2, testEmptyIndexOpen) {
...
@@ -943,7 +946,7 @@ TEST_F(IndexEnv2, testEmptyIndexOpen) {
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
targetSize
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
targetSize
;
i
++
)
{
...
...
source/libs/index/test/jsonUT.cc
浏览文件 @
843449f1
...
@@ -54,13 +54,12 @@ class JsonEnv : public ::testing::Test {
...
@@ -54,13 +54,12 @@ class JsonEnv : public ::testing::Test {
printf
(
"set up
\n
"
);
printf
(
"set up
\n
"
);
initLog
();
initLog
();
opts
=
indexOptsCreate
();
opts
=
indexOptsCreate
(
1024
*
1024
*
4
);
int
ret
=
indexJsonOpen
(
opts
,
dir
.
c_str
(),
&
index
);
int
ret
=
indexJsonOpen
(
opts
,
dir
.
c_str
(),
&
index
);
assert
(
ret
==
0
);
assert
(
ret
==
0
);
}
}
virtual
void
TearDown
()
{
virtual
void
TearDown
()
{
indexJsonClose
(
index
);
indexJsonClose
(
index
);
indexOptsDestroy
(
opts
);
printf
(
"destory
\n
"
);
printf
(
"destory
\n
"
);
taosMsleep
(
1000
);
taosMsleep
(
1000
);
}
}
...
@@ -71,7 +70,7 @@ class JsonEnv : public ::testing::Test {
...
@@ -71,7 +70,7 @@ class JsonEnv : public ::testing::Test {
static
void
WriteData
(
SIndexJson
*
index
,
const
std
::
string
&
colName
,
int8_t
dtype
,
void
*
data
,
int
dlen
,
int
tableId
,
static
void
WriteData
(
SIndexJson
*
index
,
const
std
::
string
&
colName
,
int8_t
dtype
,
void
*
data
,
int
dlen
,
int
tableId
,
int8_t
operType
=
ADD_VALUE
)
{
int8_t
operType
=
ADD_VALUE
)
{
SIndexTerm
*
term
=
indexTermCreateT
(
1
,
(
SIndexOperOnColumn
)
operType
,
dtype
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
1
,
(
SIndexOperOnColumn
)
operType
,
dtype
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
data
,
dlen
);
(
const
char
*
)
data
,
dlen
);
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
indexJsonPut
(
index
,
terms
,
(
int64_t
)
tableId
);
indexJsonPut
(
index
,
terms
,
(
int64_t
)
tableId
);
...
@@ -82,7 +81,7 @@ static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtyp
...
@@ -82,7 +81,7 @@ static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtyp
static
void
delData
(
SIndexJson
*
index
,
const
std
::
string
&
colName
,
int8_t
dtype
,
void
*
data
,
int
dlen
,
int
tableId
,
static
void
delData
(
SIndexJson
*
index
,
const
std
::
string
&
colName
,
int8_t
dtype
,
void
*
data
,
int
dlen
,
int
tableId
,
int8_t
operType
=
DEL_VALUE
)
{
int8_t
operType
=
DEL_VALUE
)
{
SIndexTerm
*
term
=
indexTermCreateT
(
1
,
(
SIndexOperOnColumn
)
operType
,
dtype
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
1
,
(
SIndexOperOnColumn
)
operType
,
dtype
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
data
,
dlen
);
(
const
char
*
)
data
,
dlen
);
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
indexJsonPut
(
index
,
terms
,
(
int64_t
)
tableId
);
indexJsonPut
(
index
,
terms
,
(
int64_t
)
tableId
);
...
@@ -108,7 +107,7 @@ TEST_F(JsonEnv, testWrite) {
...
@@ -108,7 +107,7 @@ TEST_F(JsonEnv, testWrite) {
std
::
string
colVal
(
"ab"
);
std
::
string
colVal
(
"ab"
);
for
(
int
i
=
0
;
i
<
100
;
i
++
)
{
for
(
int
i
=
0
;
i
<
100
;
i
++
)
{
SIndexTerm
*
term
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
term
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
indexMultiTermAdd
(
terms
,
term
);
indexJsonPut
(
index
,
terms
,
i
);
indexJsonPut
(
index
,
terms
,
i
);
...
@@ -147,7 +146,7 @@ TEST_F(JsonEnv, testWrite) {
...
@@ -147,7 +146,7 @@ TEST_F(JsonEnv, testWrite) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
...
@@ -205,7 +204,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
...
@@ -205,7 +204,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
...
@@ -220,7 +219,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
...
@@ -220,7 +219,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
...
@@ -235,7 +234,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
...
@@ -235,7 +234,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
...
@@ -305,7 +304,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
...
@@ -305,7 +304,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
int
val
=
15
;
int
val
=
15
;
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
...
@@ -319,7 +318,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
...
@@ -319,7 +318,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
...
@@ -334,7 +333,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
...
@@ -334,7 +333,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
int
));
(
const
char
*
)
&
val
,
sizeof
(
int
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
...
@@ -349,7 +348,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
...
@@ -349,7 +348,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_THAN
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_THAN
);
...
@@ -364,7 +363,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
...
@@ -364,7 +363,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_EQUAL
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_EQUAL
);
...
@@ -407,7 +406,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
...
@@ -407,7 +406,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
...
@@ -421,7 +420,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
...
@@ -421,7 +420,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
int
));
(
const
char
*
)
&
val
,
sizeof
(
int
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
...
@@ -436,7 +435,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
...
@@ -436,7 +435,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
...
@@ -450,7 +449,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
...
@@ -450,7 +449,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_THAN
);
...
@@ -464,7 +463,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
...
@@ -464,7 +463,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_EQUAL
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_EQUAL
);
...
@@ -493,7 +492,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
...
@@ -493,7 +492,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_THAN
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_LESS_THAN
);
...
@@ -521,7 +520,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
...
@@ -521,7 +520,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
SIndexTerm
*
q
=
indexTermCreateT
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
colName
.
c_str
(),
colName
.
size
(),
(
const
char
*
)
&
val
,
sizeof
(
val
));
(
const
char
*
)
&
val
,
sizeof
(
val
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_GREATER_EQUAL
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
843449f1
...
@@ -771,6 +771,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
...
@@ -771,6 +771,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
SExHandle
*
exh
=
transAcquireExHandle
(
transGetRefMgt
(),
refId
);
SExHandle
*
exh
=
transAcquireExHandle
(
transGetRefMgt
(),
refId
);
if
(
exh
==
NULL
)
{
if
(
exh
==
NULL
)
{
tDebug
(
"%"
PRId64
" already release"
,
refId
);
tDebug
(
"%"
PRId64
" already release"
,
refId
);
destroyCmsg
(
pMsg
);
return
;
return
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录