Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
144358f0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
144358f0
编写于
1月 04, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj
上级
5d63e312
b30a26fc
变更
27
展开全部
隐藏空白更改
内联
并排
Showing
27 changed file
with
1190 addition
and
645 deletion
+1190
-645
include/common/tmsg.h
include/common/tmsg.h
+0
-42
include/util/encode.h
include/util/encode.h
+246
-317
source/client/CMakeLists.txt
source/client/CMakeLists.txt
+3
-1
source/common/CMakeLists.txt
source/common/CMakeLists.txt
+3
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+0
-131
source/libs/catalog/CMakeLists.txt
source/libs/catalog/CMakeLists.txt
+3
-1
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+9
-0
source/libs/index/inc/index_cache.h
source/libs/index/inc/index_cache.h
+2
-1
source/libs/index/inc/index_fst_counting_writer.h
source/libs/index/inc/index_fst_counting_writer.h
+1
-0
source/libs/index/inc/index_tfile.h
source/libs/index/inc/index_tfile.h
+4
-10
source/libs/index/inc/index_util.h
source/libs/index/inc/index_util.h
+1
-1
source/libs/index/src/index.c
source/libs/index/src/index.c
+51
-18
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+7
-3
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+4
-2
source/libs/index/src/index_fst_counting_writer.c
source/libs/index/src/index_fst_counting_writer.c
+9
-0
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+95
-82
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+111
-4
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+54
-23
source/libs/parser/CMakeLists.txt
source/libs/parser/CMakeLists.txt
+3
-1
source/libs/planner/CMakeLists.txt
source/libs/planner/CMakeLists.txt
+3
-1
source/libs/qcom/CMakeLists.txt
source/libs/qcom/CMakeLists.txt
+5
-3
source/libs/qworker/CMakeLists.txt
source/libs/qworker/CMakeLists.txt
+3
-1
source/libs/scheduler/CMakeLists.txt
source/libs/scheduler/CMakeLists.txt
+3
-1
source/util/CMakeLists.txt
source/util/CMakeLists.txt
+4
-1
source/util/src/encode.c
source/util/src/encode.c
+141
-0
source/util/test/CMakeLists.txt
source/util/test/CMakeLists.txt
+4
-0
source/util/test/encodeTest.cpp
source/util/test/encodeTest.cpp
+421
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
144358f0
...
...
@@ -57,46 +57,6 @@ extern int tMsgDict[];
typedef
uint16_t
tmsg_t
;
/* ------------------------ ENCODE/DECODE FUNCTIONS AND MACROS ------------------------ */
struct
SMEListNode
{
TD_SLIST_NODE
(
SMEListNode
);
SEncoder
coder
;
};
typedef
struct
SMsgEncoder
{
SEncoder
coder
;
TD_SLIST
(
SMEListNode
)
eStack
;
// encode stack
}
SMsgEncoder
;
struct
SMDFreeListNode
{
TD_SLIST_NODE
(
SMDFreeListNode
);
char
payload
[];
};
struct
SMDListNode
{
TD_SLIST_NODE
(
SMDListNode
);
SDecoder
coder
;
};
typedef
struct
SMsgDecoder
{
SDecoder
coder
;
TD_SLIST
(
SMDListNode
)
dStack
;
TD_SLIST
(
SMDFreeListNode
)
freeList
;
}
SMsgDecoder
;
#define TMSG_MALLOC(SIZE, DECODER) \
({ \
void* ptr = malloc((SIZE) + sizeof(struct SMDFreeListNode)); \
if (ptr) { \
TD_SLIST_PUSH(&((DECODER)->freeList), (struct SMDFreeListNode*)ptr); \
ptr = POINTER_SHIFT(ptr, sizeof(struct SMDFreeListNode*)); \
} \
ptr; \
})
void
tmsgInitMsgDecoder
(
SMsgDecoder
*
pMD
,
td_endian_t
endian
,
uint8_t
*
data
,
int64_t
size
);
void
tmsgClearMsgDecoder
(
SMsgDecoder
*
pMD
);
/* ------------------------ OTHER DEFINITIONS ------------------------ */
// IE type
#define TSDB_IE_TYPE_SEC 1
...
...
@@ -1283,8 +1243,6 @@ typedef struct {
SArray
*
pArray
;
}
SVCreateTbBatchReq
;
int
tmsgSVCreateTbReqEncode
(
SMsgEncoder
*
pCoder
,
SVCreateTbReq
*
pReq
);
int
tmsgSVCreateTbReqDecode
(
SMsgDecoder
*
pCoder
,
SVCreateTbReq
*
pReq
);
int
tSerializeSVCreateTbReq
(
void
**
buf
,
SVCreateTbReq
*
pReq
);
void
*
tDeserializeSVCreateTbReq
(
void
*
buf
,
SVCreateTbReq
*
pReq
);
int
tSVCreateTbBatchReqSerialize
(
void
**
buf
,
SVCreateTbBatchReq
*
pReq
);
...
...
include/util/encode.h
浏览文件 @
144358f0
此差异已折叠。
点击以展开。
source/client/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util common transport parser planner catalog scheduler function qcom
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
\ No newline at end of file
source/common/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -12,4 +12,6 @@ target_link_libraries(
INTERFACE api
)
ADD_SUBDIRECTORY
(
test
)
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
source/common/src/tmsg.c
浏览文件 @
144358f0
...
...
@@ -27,77 +27,6 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
static
int
tmsgStartEncode
(
SMsgEncoder
*
pME
);
static
void
tmsgEndEncode
(
SMsgEncoder
*
pME
);
static
int
tmsgStartDecode
(
SMsgDecoder
*
pMD
);
static
void
tmsgEndDecode
(
SMsgDecoder
*
pMD
);
/* ------------------------ ENCODE/DECODE FUNCTIONS ------------------------ */
void
tmsgInitMsgEncoder
(
SMsgEncoder
*
pME
,
td_endian_t
endian
,
uint8_t
*
data
,
int64_t
size
)
{
tInitEncoder
(
&
(
pME
->
coder
),
endian
,
data
,
size
);
TD_SLIST_INIT
(
&
(
pME
->
eStack
));
}
void
tmsgClearMsgEncoder
(
SMsgEncoder
*
pME
)
{
struct
SMEListNode
*
pNode
;
for
(;;)
{
pNode
=
TD_SLIST_HEAD
(
&
(
pME
->
eStack
));
if
(
TD_IS_NULL
(
pNode
))
break
;
TD_SLIST_POP
(
&
(
pME
->
eStack
));
free
(
pNode
);
}
}
void
tmsgInitMsgDecoder
(
SMsgDecoder
*
pMD
,
td_endian_t
endian
,
uint8_t
*
data
,
int64_t
size
)
{
tInitDecoder
(
&
pMD
->
coder
,
endian
,
data
,
size
);
TD_SLIST_INIT
(
&
(
pMD
->
dStack
));
TD_SLIST_INIT
(
&
(
pMD
->
freeList
));
}
void
tmsgClearMsgDecoder
(
SMsgDecoder
*
pMD
)
{
{
struct
SMDFreeListNode
*
pNode
;
for
(;;)
{
pNode
=
TD_SLIST_HEAD
(
&
(
pMD
->
freeList
));
if
(
TD_IS_NULL
(
pNode
))
break
;
TD_SLIST_POP
(
&
(
pMD
->
freeList
));
free
(
pNode
);
}
}
{
struct
SMDListNode
*
pNode
;
for
(;;)
{
pNode
=
TD_SLIST_HEAD
(
&
(
pMD
->
dStack
));
if
(
TD_IS_NULL
(
pNode
))
break
;
TD_SLIST_POP
(
&
(
pMD
->
dStack
));
free
(
pNode
);
}
}
}
/* ------------------------ MESSAGE ENCODE/DECODE ------------------------ */
int
tmsgSVCreateTbReqEncode
(
SMsgEncoder
*
pCoder
,
SVCreateTbReq
*
pReq
)
{
tmsgStartEncode
(
pCoder
);
// TODO
tmsgEndEncode
(
pCoder
);
return
0
;
}
int
tmsgSVCreateTbReqDecode
(
SMsgDecoder
*
pCoder
,
SVCreateTbReq
*
pReq
)
{
tmsgStartDecode
(
pCoder
);
// TODO: decode
// Decode is not end
if
(
pCoder
->
coder
.
pos
!=
pCoder
->
coder
.
size
)
{
// Continue decode
}
tmsgEndDecode
(
pCoder
);
return
0
;
}
int
tSerializeSVCreateTbReq
(
void
**
buf
,
SVCreateTbReq
*
pReq
)
{
int
tlen
=
0
;
...
...
@@ -218,64 +147,4 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
}
return
buf
;
}
/* ------------------------ STATIC METHODS ------------------------ */
static
int
tmsgStartEncode
(
SMsgEncoder
*
pME
)
{
struct
SMEListNode
*
pNode
=
(
struct
SMEListNode
*
)
malloc
(
sizeof
(
*
pNode
));
if
(
TD_IS_NULL
(
pNode
))
return
-
1
;
pNode
->
coder
=
pME
->
coder
;
TD_SLIST_PUSH
(
&
(
pME
->
eStack
),
pNode
);
TD_CODER_MOVE_POS
(
&
(
pME
->
coder
),
sizeof
(
int32_t
));
return
0
;
}
static
void
tmsgEndEncode
(
SMsgEncoder
*
pME
)
{
int32_t
size
;
struct
SMEListNode
*
pNode
;
pNode
=
TD_SLIST_HEAD
(
&
(
pME
->
eStack
));
ASSERT
(
pNode
);
TD_SLIST_POP
(
&
(
pME
->
eStack
));
size
=
pME
->
coder
.
pos
-
pNode
->
coder
.
pos
;
tEncodeI32
(
&
(
pNode
->
coder
),
size
);
free
(
pNode
);
}
static
int
tmsgStartDecode
(
SMsgDecoder
*
pMD
)
{
struct
SMDListNode
*
pNode
;
int32_t
size
;
pNode
=
(
struct
SMDListNode
*
)
malloc
(
sizeof
(
*
pNode
));
if
(
pNode
==
NULL
)
return
-
1
;
tDecodeI32
(
&
(
pMD
->
coder
),
&
size
);
pNode
->
coder
=
pMD
->
coder
;
TD_SLIST_PUSH
(
&
(
pMD
->
dStack
),
pNode
);
pMD
->
coder
.
pos
=
0
;
pMD
->
coder
.
size
=
size
-
sizeof
(
int32_t
);
pMD
->
coder
.
data
=
TD_CODER_CURRENT
(
&
(
pNode
->
coder
));
return
0
;
}
static
void
tmsgEndDecode
(
SMsgDecoder
*
pMD
)
{
ASSERT
(
pMD
->
coder
.
pos
==
pMD
->
coder
.
size
);
struct
SMDListNode
*
pNode
;
pNode
=
TD_SLIST_HEAD
(
&
(
pMD
->
dStack
));
ASSERT
(
pNode
);
TD_SLIST_POP
(
&
(
pMD
->
dStack
));
pNode
->
coder
.
pos
+=
pMD
->
coder
.
size
;
pMD
->
coder
=
pNode
->
coder
;
free
(
pNode
);
}
\ No newline at end of file
source/libs/catalog/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util transport qcom
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
\ No newline at end of file
source/libs/index/inc/indexInt.h
浏览文件 @
144358f0
...
...
@@ -108,8 +108,17 @@ void iterateValueDestroy(IterateValue* iv, bool destroy);
extern
void
*
indexQhandle
;
typedef
struct
TFileCacheKey
{
uint64_t
suid
;
uint8_t
colType
;
char
*
colName
;
int32_t
nColName
;
}
ICacheKey
;
int
indexFlushCacheTFile
(
SIndex
*
sIdx
,
void
*
);
int32_t
indexSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
);
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
...
...
source/libs/index/inc/index_cache.h
浏览文件 @
144358f0
...
...
@@ -42,6 +42,7 @@ typedef struct IndexCache {
int32_t
version
;
int32_t
nTerm
;
int8_t
type
;
uint64_t
suid
;
pthread_mutex_t
mtx
;
}
IndexCache
;
...
...
@@ -58,7 +59,7 @@ typedef struct CacheTerm {
}
CacheTerm
;
//
IndexCache
*
indexCacheCreate
(
SIndex
*
idx
,
const
char
*
colName
,
int8_t
type
);
IndexCache
*
indexCacheCreate
(
SIndex
*
idx
,
uint64_t
suid
,
const
char
*
colName
,
int8_t
type
);
void
indexCacheDestroy
(
void
*
cache
);
...
...
source/libs/index/inc/index_fst_counting_writer.h
浏览文件 @
144358f0
...
...
@@ -38,6 +38,7 @@ typedef struct WriterCtx {
int
fd
;
bool
readOnly
;
char
buf
[
256
];
int
size
;
}
file
;
struct
{
int32_t
capa
;
...
...
source/libs/index/inc/index_tfile.h
浏览文件 @
144358f0
...
...
@@ -49,13 +49,6 @@ typedef struct TFileValue {
int32_t
offset
;
}
TFileValue
;
typedef
struct
TFileCacheKey
{
uint64_t
suid
;
uint8_t
colType
;
char
*
colName
;
int32_t
nColName
;
}
TFileCacheKey
;
// table cache
// refactor to LRU cache later
typedef
struct
TFileCache
{
...
...
@@ -103,10 +96,10 @@ typedef struct TFileReaderOpt {
// tfile cache, manage tindex reader
TFileCache
*
tfileCacheCreate
(
const
char
*
path
);
void
tfileCacheDestroy
(
TFileCache
*
tcache
);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
TFile
CacheKey
*
key
);
void
tfileCachePut
(
TFileCache
*
tcache
,
TFile
CacheKey
*
key
,
TFileReader
*
reader
);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
I
CacheKey
*
key
);
void
tfileCachePut
(
TFileCache
*
tcache
,
I
CacheKey
*
key
,
TFileReader
*
reader
);
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
char
*
colName
);
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
uint64_t
suid
,
char
*
colName
);
TFileReader
*
tfileReaderOpen
(
char
*
path
,
uint64_t
suid
,
int32_t
version
,
const
char
*
colName
);
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
);
...
...
@@ -124,6 +117,7 @@ int tfileWriterFinish(TFileWriter* tw);
//
IndexTFile
*
indexTFileCreate
(
const
char
*
path
);
void
indexTFileDestroy
(
IndexTFile
*
tfile
);
int
indexTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
);
int
indexTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SArray
*
result
);
...
...
source/libs/index/inc/index_util.h
浏览文件 @
144358f0
...
...
@@ -34,7 +34,7 @@ extern "C" {
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
do { \
type c = var; \
assert(sizeof(
var) == sizeof(type));
\
assert(sizeof(
type) == sizeof(c));
\
memcpy((void*)buf, (void*)&c, sizeof(c)); \
buf += sizeof(c); \
} while (0)
...
...
source/libs/index/src/index.c
浏览文件 @
144358f0
...
...
@@ -17,6 +17,7 @@
#include "indexInt.h"
#include "index_cache.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tdef.h"
#include "tsched.h"
...
...
@@ -72,6 +73,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#ifdef USE_INVERTED_INDEX
// sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx
->
tindex
=
indexTFileCreate
(
path
);
if
(
sIdx
->
tindex
==
NULL
)
{
goto
END
;
}
sIdx
->
colObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
cVersion
=
1
;
sIdx
->
path
=
calloc
(
1
,
strlen
(
path
)
+
1
);
...
...
@@ -82,6 +84,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
return
0
;
#endif
END:
if
(
sIdx
!=
NULL
)
{
indexClose
(
sIdx
);
}
*
index
=
NULL
;
return
-
1
;
...
...
@@ -102,6 +106,7 @@ void indexClose(SIndex* sIdx) {
}
taosHashCleanup
(
sIdx
->
colObj
);
pthread_mutex_destroy
(
&
sIdx
->
mtx
);
indexTFileDestroy
(
sIdx
->
tindex
);
#endif
free
(
sIdx
->
path
);
free
(
sIdx
);
...
...
@@ -130,18 +135,28 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
// TODO(yihao): reduce the lock range
pthread_mutex_lock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
)};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
if
(
cache
==
NULL
)
{
IndexCache
*
pCache
=
indexCacheCreate
(
index
,
p
->
colName
,
p
->
colType
);
taosHashPut
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
,
&
pCache
,
sizeof
(
void
*
));
IndexCache
*
pCache
=
indexCacheCreate
(
index
,
p
->
suid
,
p
->
colName
,
p
->
colType
);
taosHashPut
(
index
->
colObj
,
buf
,
sz
,
&
pCache
,
sizeof
(
void
*
));
}
}
pthread_mutex_unlock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
)};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
assert
(
*
cache
!=
NULL
);
int
ret
=
indexCachePut
(
*
cache
,
p
,
uid
);
if
(
ret
!=
0
)
{
return
ret
;
}
...
...
@@ -200,7 +215,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
indexInterResultsDestroy
(
interResults
);
#endif
return
1
;
return
0
;
}
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
)
{
...
...
@@ -296,7 +311,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
// Get col info
IndexCache
*
cache
=
NULL
;
pthread_mutex_lock
(
&
sIdx
->
mtx
);
IndexCache
**
pCache
=
taosHashGet
(
sIdx
->
colObj
,
colName
,
nColName
);
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
term
->
suid
,
.
colName
=
term
->
colName
,
.
nColName
=
strlen
(
term
->
colName
)};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
IndexCache
**
pCache
=
taosHashGet
(
sIdx
->
colObj
,
buf
,
sz
);
if
(
pCache
==
NULL
)
{
pthread_mutex_unlock
(
&
sIdx
->
mtx
);
return
-
1
;
...
...
@@ -360,6 +380,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
if
(
sz
>
0
)
{
// TODO(yihao): remove duplicate tableid
TFileValue
*
lv
=
taosArrayGetP
(
result
,
sz
-
1
);
// indexError("merge colVal: %s", lv->colVal);
if
(
strcmp
(
lv
->
colVal
,
tv
->
colVal
)
==
0
)
{
taosArrayAddAll
(
lv
->
tableId
,
tv
->
tableId
);
tfileValueDestroy
(
tv
);
...
...
@@ -368,6 +389,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
}
}
else
{
taosArrayPush
(
result
,
&
tv
);
// indexError("merge colVal: %s", tv->colVal);
}
}
static
void
indexDestroyTempResult
(
SArray
*
result
)
{
...
...
@@ -383,10 +405,12 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
indexWarn
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
IndexCache
*
pCache
=
(
IndexCache
*
)
cache
;
TFileReader
*
pReader
=
tfileGetReaderByCol
(
sIdx
->
tindex
,
pCache
->
colName
);
TFileReader
*
pReader
=
tfileGetReaderByCol
(
sIdx
->
tindex
,
pCache
->
suid
,
pCache
->
colName
);
if
(
pReader
==
NULL
)
{
indexWarn
(
"empty pReader found"
);
}
// handle flush
Iterate
*
cacheIter
=
indexCacheIteratorCreate
(
pCache
);
Iterate
*
tfileIter
=
tfileIteratorCreate
(
pReader
);
if
(
tfileIter
==
NULL
)
{
indexWarn
(
"empty tfile reader iterator"
);
}
SArray
*
result
=
taosArrayInit
(
1024
,
sizeof
(
void
*
));
...
...
@@ -459,14 +483,14 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
}
else
{
if
(
value
->
val
!=
NULL
)
{
taosArrayClear
(
value
->
val
);
}
}
//
free(value->colVal);
free
(
value
->
colVal
);
value
->
colVal
=
NULL
;
}
static
int
indexGenTFile
(
SIndex
*
sIdx
,
IndexCache
*
cache
,
SArray
*
batch
)
{
int32_t
version
=
CACHE_VERSION
(
cache
);
uint8_t
colType
=
cache
->
type
;
TFileWriter
*
tw
=
tfileWriterOpen
(
sIdx
->
path
,
sIdx
->
suid
,
version
,
cache
->
colName
,
colType
);
TFileWriter
*
tw
=
tfileWriterOpen
(
sIdx
->
path
,
cache
->
suid
,
version
,
cache
->
colName
,
colType
);
if
(
tw
==
NULL
)
{
indexError
(
"failed to open file to write"
);
return
-
1
;
...
...
@@ -479,14 +503,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
}
tfileWriterClose
(
tw
);
TFileReader
*
reader
=
tfileReaderOpen
(
sIdx
->
path
,
sIdx
->
suid
,
version
,
cache
->
colName
);
TFileReader
*
reader
=
tfileReaderOpen
(
sIdx
->
path
,
cache
->
suid
,
version
,
cache
->
colName
);
char
buf
[
128
]
=
{
0
};
TFileHeader
*
header
=
&
reader
->
header
;
ICacheKey
key
=
{
.
suid
=
cache
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
),
.
colType
=
header
->
colType
};
char
buf
[
128
]
=
{
0
};
TFileHeader
*
header
=
&
reader
->
header
;
TFileCacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
),
.
colType
=
header
->
colType
};
pthread_mutex_lock
(
&
sIdx
->
mtx
);
IndexTFile
*
ifile
=
(
IndexTFile
*
)
sIdx
->
tindex
;
...
...
@@ -497,3 +520,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
END:
tfileWriterClose
(
tw
);
}
int32_t
indexSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
)
{
char
*
p
=
buf
;
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
suid
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF
(
buf
,
key
,
colName
,
key
->
nColName
);
return
buf
-
p
;
}
source/libs/index/src/index_cache.c
浏览文件 @
144358f0
...
...
@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT
10000 * 1
0
#define MEM_TERM_LIMIT
5 * 1000
0
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
...
...
@@ -40,7 +40,7 @@ static bool indexCacheIteratorNext(Iterate* itera);
static
IterateValue
*
indexCacheIteratorGetValue
(
Iterate
*
iter
);
IndexCache
*
indexCacheCreate
(
SIndex
*
idx
,
const
char
*
colName
,
int8_t
type
)
{
IndexCache
*
indexCacheCreate
(
SIndex
*
idx
,
uint64_t
suid
,
const
char
*
colName
,
int8_t
type
)
{
IndexCache
*
cache
=
calloc
(
1
,
sizeof
(
IndexCache
));
if
(
cache
==
NULL
)
{
indexError
(
"failed to create index cache"
);
...
...
@@ -53,7 +53,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
cache
->
type
=
type
;
cache
->
index
=
idx
;
cache
->
version
=
0
;
cache
->
suid
=
suid
;
pthread_mutex_init
(
&
cache
->
mtx
,
NULL
);
indexCacheRef
(
cache
);
return
cache
;
...
...
@@ -150,6 +150,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
MemTable
*
tbl
=
cache
->
imm
;
iiter
->
val
.
val
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
iiter
->
val
.
colVal
=
NULL
;
iiter
->
iter
=
tbl
!=
NULL
?
tSkipListCreateIter
(
tbl
->
mem
)
:
NULL
;
iiter
->
next
=
indexCacheIteratorNext
;
iiter
->
getValue
=
indexCacheIteratorGetValue
;
...
...
@@ -353,6 +354,9 @@ static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator
*
iter
=
itera
->
iter
;
if
(
iter
==
NULL
)
{
return
false
;
}
IterateValue
*
iv
=
&
itera
->
val
;
if
(
iv
->
colVal
!=
NULL
&&
iv
->
val
!=
NULL
)
{
// indexError("value in cache: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
}
iterateValueDestroy
(
iv
,
false
);
bool
next
=
tSkipListIterNext
(
iter
);
...
...
source/libs/index/src/index_fst.c
浏览文件 @
144358f0
...
...
@@ -319,7 +319,7 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) {
assert
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
uint8_t
val
;
COMMON_INDEX
(
inp
,
0
x
111111
,
val
);
COMMON_INDEX
(
inp
,
0
b
111111
,
val
);
s
->
val
=
(
s
->
val
&
fstStateDict
[
s
->
state
].
val
)
|
val
;
}
...
...
@@ -369,7 +369,7 @@ uint8_t fstStateInput(FstState* s, FstNode* node) {
bool
null
=
false
;
uint8_t
inp
=
fstStateCommInput
(
s
,
&
null
);
uint8_t
*
data
=
fstSliceData
(
slice
,
NULL
);
return
null
==
false
?
inp
:
data
[
-
1
];
return
null
==
false
?
inp
:
data
[
node
->
start
-
1
];
}
uint8_t
fstStateInputForAnyTrans
(
FstState
*
s
,
FstNode
*
node
,
uint64_t
i
)
{
assert
(
s
->
state
==
AnyTrans
);
...
...
@@ -1062,6 +1062,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) {
}
else
{
*
null
=
true
;
}
fstNodeDestroy
(
node
);
return
res
;
}
...
...
@@ -1286,6 +1287,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
StreamWithStateResult
*
result
=
swsResultCreate
(
&
slice
,
fOutput
,
tState
);
free
(
buf
);
fstSliceDestroy
(
&
slice
);
taosArrayDestroy
(
nodes
);
return
result
;
}
free
(
buf
);
...
...
source/libs/index/src/index_fst_counting_writer.c
浏览文件 @
144358f0
...
...
@@ -72,9 +72,17 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
if
(
readOnly
==
false
)
{
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx
->
file
.
fd
=
tfOpenCreateWriteAppend
(
path
);
struct
stat
fstat
;
stat
(
path
,
&
fstat
);
ctx
->
file
.
size
=
fstat
.
st_size
;
}
else
{
// ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
ctx
->
file
.
fd
=
tfOpenRead
(
path
);
struct
stat
fstat
;
stat
(
path
,
&
fstat
);
ctx
->
file
.
size
=
fstat
.
st_size
;
}
memcpy
(
ctx
->
file
.
buf
,
path
,
strlen
(
path
));
if
(
ctx
->
file
.
fd
<
0
)
{
...
...
@@ -104,6 +112,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
free
(
ctx
->
mem
.
buf
);
}
else
{
tfClose
(
ctx
->
file
.
fd
);
ctx
->
flush
(
ctx
);
if
(
remove
)
{
unlink
(
ctx
->
file
.
buf
);
}
}
free
(
ctx
);
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
144358f0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
p
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
...
...
@@ -45,13 +45,13 @@ static int tfileReaderLoadHeader(TFileReader* reader);
static
int
tfileReaderLoadFst
(
TFileReader
*
reader
);
static
int
tfileReaderLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
);
static
int
tfileGetFileList
(
const
char
*
path
,
SArray
*
result
);
static
int
tfileRmExpireFile
(
SArray
*
result
);
static
void
tfileDestroyFileName
(
void
*
elem
);
static
int
tfileCompare
(
const
void
*
a
,
const
void
*
b
);
static
int
tfileParseFileName
(
const
char
*
filename
,
uint64_t
*
suid
,
int
*
colId
,
int
*
version
);
static
void
tfileGenFileName
(
char
*
filename
,
uint64_t
suid
,
int
colId
,
int
version
);
static
void
tfileSerialCacheKey
(
TFileCacheKey
*
key
,
char
*
buf
);
static
SArray
*
tfileGetFileList
(
const
char
*
path
);
static
int
tfileRmExpireFile
(
SArray
*
result
);
static
void
tfileDestroyFileName
(
void
*
elem
);
static
int
tfileCompare
(
const
void
*
a
,
const
void
*
b
);
static
int
tfileParseFileName
(
const
char
*
filename
,
uint64_t
*
suid
,
char
*
col
,
int
*
version
);
static
void
tfileGenFileName
(
char
*
filename
,
uint64_t
suid
,
const
char
*
col
,
int
version
);
static
void
tfileGenFileFullName
(
char
*
fullname
,
const
char
*
path
,
uint64_t
suid
,
const
char
*
col
,
int32_t
version
);
TFileCache
*
tfileCacheCreate
(
const
char
*
path
)
{
TFileCache
*
tcache
=
calloc
(
1
,
sizeof
(
TFileCache
));
...
...
@@ -60,38 +60,41 @@ TFileCache* tfileCacheCreate(const char* path) {
tcache
->
tableCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
tcache
->
capacity
=
64
;
SArray
*
files
=
taosArrayInit
(
4
,
sizeof
(
void
*
));
tfileGetFileList
(
path
,
files
);
taosArraySort
(
files
,
tfileCompare
);
tfileRmExpireFile
(
files
);
SArray
*
files
=
tfileGetFileList
(
path
);
uint64_t
suid
;
int32_t
colId
,
version
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
files
);
i
++
)
{
char
*
file
=
taosArrayGetP
(
files
,
i
);
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
(
int
*
)
&
colId
,
(
int
*
)
&
version
))
{
// refactor later, use colname and version info
char
colName
[
256
]
=
{
0
};
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
colName
,
(
int
*
)
&
version
))
{
indexInfo
(
"try parse invalid file: %s, skip it"
,
file
);
continue
;
}
WriterCtx
*
wc
=
writerCtxCreate
(
TFile
,
file
,
true
,
1024
*
1024
*
64
);
char
fullName
[
256
]
=
{
0
};
sprintf
(
fullName
,
"%s/%s"
,
path
,
file
);
WriterCtx
*
wc
=
writerCtxCreate
(
TFile
,
fullName
,
true
,
1024
*
1024
*
64
);
if
(
wc
==
NULL
)
{
indexError
(
"failed to open index:%s"
,
file
);
goto
End
;
}
char
buf
[
128
]
=
{
0
};
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
TFileHeader
*
header
=
&
reader
->
header
;
TFileCacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
),
.
colType
=
header
->
colType
};
tfileSerialCacheKey
(
&
key
,
buf
);
char
buf
[
128
]
=
{
0
};
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
TFileHeader
*
header
=
&
reader
->
header
;
ICacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
),
.
colType
=
header
->
colType
};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
taosHashPut
(
tcache
->
tableCache
,
buf
,
sz
,
&
reader
,
sizeof
(
void
*
));
tfileReaderRef
(
reader
);
// indexTable
taosHashPut
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
),
&
reader
,
sizeof
(
void
*
));
}
taosArrayDestroyEx
(
files
,
tfileDestroyFileName
);
return
tcache
;
...
...
@@ -117,30 +120,30 @@ void tfileCacheDestroy(TFileCache* tcache) {
free
(
tcache
);
}
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
TFile
CacheKey
*
key
)
{
char
buf
[
128
]
=
{
0
};
tfile
SerialCacheKey
(
key
,
buf
);
TFileReader
**
reader
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
s
trlen
(
buf
)
);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
I
CacheKey
*
key
)
{
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
index
SerialCacheKey
(
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
TFileReader
**
reader
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
s
z
);
if
(
reader
==
NULL
)
{
return
NULL
;
}
tfileReaderRef
(
*
reader
);
return
*
reader
;
}
void
tfileCachePut
(
TFileCache
*
tcache
,
TFile
CacheKey
*
key
,
TFileReader
*
reader
)
{
char
buf
[
128
]
=
{
0
};
tfile
SerialCacheKey
(
key
,
buf
);
void
tfileCachePut
(
TFileCache
*
tcache
,
I
CacheKey
*
key
,
TFileReader
*
reader
)
{
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
index
SerialCacheKey
(
key
,
buf
);
// remove last version index reader
TFileReader
**
p
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
s
trlen
(
buf
)
);
TFileReader
**
p
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
s
z
);
if
(
p
!=
NULL
)
{
TFileReader
*
oldReader
=
*
p
;
taosHashRemove
(
tcache
->
tableCache
,
buf
,
s
trlen
(
buf
)
);
taosHashRemove
(
tcache
->
tableCache
,
buf
,
s
z
);
oldReader
->
remove
=
true
;
tfileReaderUnRef
(
oldReader
);
}
taosHashPut
(
tcache
->
tableCache
,
buf
,
sz
,
&
reader
,
sizeof
(
void
*
));
tfileReaderRef
(
reader
);
taosHashPut
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
),
&
reader
,
sizeof
(
void
*
));
return
;
}
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
)
{
...
...
@@ -201,12 +204,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
}
TFileWriter
*
tfileWriterOpen
(
char
*
path
,
uint64_t
suid
,
int32_t
version
,
const
char
*
colName
,
uint8_t
colType
)
{
char
filename
[
128
]
=
{
0
};
int32_t
coldId
=
1
;
tfileGenFileName
(
filename
,
suid
,
coldId
,
version
);
char
fullname
[
256
]
=
{
0
};
snprintf
(
fullname
,
sizeof
(
fullname
),
"%s/%s"
,
path
,
filename
);
tfileGenFileFullName
(
fullname
,
path
,
suid
,
colName
,
version
);
// indexInfo("open write file name %s", fullname);
WriterCtx
*
wcx
=
writerCtxCreate
(
TFile
,
fullname
,
false
,
1024
*
1024
*
64
);
if
(
wcx
==
NULL
)
{
return
NULL
;
}
...
...
@@ -219,19 +219,15 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
return
tfileWriterCreate
(
wcx
,
&
tfh
);
}
TFileReader
*
tfileReaderOpen
(
char
*
path
,
uint64_t
suid
,
int32_t
version
,
const
char
*
colName
)
{
char
filename
[
128
]
=
{
0
};
int32_t
coldId
=
1
;
tfileGenFileName
(
filename
,
suid
,
coldId
,
version
);
char
fullname
[
256
]
=
{
0
};
snprintf
(
fullname
,
sizeof
(
fullname
),
"%s/%s"
,
path
,
filename
);
tfileGenFileFullName
(
fullname
,
path
,
suid
,
colName
,
version
);
WriterCtx
*
wc
=
writerCtxCreate
(
TFile
,
fullname
,
true
,
1024
*
1024
*
1024
);
// indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
if
(
wc
==
NULL
)
{
return
NULL
;
}
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
return
reader
;
// tfileSerialCacheKey(&key, buf);
}
TFileWriter
*
tfileWriterCreate
(
WriterCtx
*
ctx
,
TFileHeader
*
header
)
{
// char pathBuf[128] = {0};
...
...
@@ -325,17 +321,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
tfileWriterClose
(
tw
);
return
-
1
;
}
// write fst
indexError
(
"--------Begin----------------"
);
// write data
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
// TODO, fst batch write later
TFileValue
*
v
=
taosArrayGetP
((
SArray
*
)
data
,
i
);
if
(
tfileWriteData
(
tw
,
v
)
==
0
)
{
//
if
(
tfileWriteData
(
tw
,
v
)
!=
0
)
{
indexError
(
"failed to write data: %s, offset: %d len: %d"
,
v
->
colVal
,
v
->
offset
,
(
int
)
taosArrayGetSize
(
v
->
tableId
));
}
else
{
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
// (int)taosArrayGetSize(v->tableId));
}
indexError
(
"data: %s, offset: %d len: %d"
,
v
->
colVal
,
v
->
offset
,
(
int
)
taosArrayGetSize
(
v
->
tableId
));
}
indexError
(
"--------End----------------"
);
fstBuilderFinish
(
tw
->
fb
);
fstBuilderDestroy
(
tw
->
fb
);
tw
->
fb
=
NULL
;
...
...
@@ -359,7 +357,8 @@ IndexTFile* indexTFileCreate(const char* path) {
tfile
->
cache
=
tfileCacheCreate
(
path
);
return
tfile
;
}
void
IndexTFileDestroy
(
IndexTFile
*
tfile
)
{
void
indexTFileDestroy
(
IndexTFile
*
tfile
)
{
if
(
tfile
==
NULL
)
{
return
;
}
tfileCacheDestroy
(
tfile
->
cache
);
free
(
tfile
);
}
...
...
@@ -369,9 +368,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
if
(
tfile
==
NULL
)
{
return
ret
;
}
IndexTFile
*
pTfile
=
(
IndexTFile
*
)
tfile
;
SIndexTerm
*
term
=
query
->
term
;
TFileCacheKey
key
=
{
.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
SIndexTerm
*
term
=
query
->
term
;
ICacheKey
key
=
{.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
TFileReader
*
reader
=
tfileCacheGet
(
pTfile
->
cache
,
&
key
);
if
(
reader
==
NULL
)
{
return
0
;
}
...
...
@@ -385,8 +383,10 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
}
static
bool
tfileIteratorNext
(
Iterate
*
iiter
)
{
IterateValue
*
iv
=
&
iiter
->
val
;
if
(
iv
->
colVal
!=
NULL
&&
iv
->
val
!=
NULL
)
{
// indexError("value in fst: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
}
iterateValueDestroy
(
iv
,
false
);
// SArray* tblIds = iv->val;
char
*
colVal
=
NULL
;
uint64_t
offset
=
0
;
...
...
@@ -406,14 +406,14 @@ static bool tfileIteratorNext(Iterate* iiter) {
if
(
tfileReaderLoadTableIds
(
tIter
->
rdr
,
offset
,
iv
->
val
)
!=
0
)
{
return
false
;
}
iv
->
colVal
=
colVal
;
return
true
;
// std::string key(ch, sz);
}
static
IterateValue
*
tifileIterateGetValue
(
Iterate
*
iter
)
{
return
&
iter
->
val
;
}
static
TFileFstIter
*
tfileFstIteratorCreate
(
TFileReader
*
reader
)
{
TFileFstIter
*
tIter
=
calloc
(
1
,
sizeof
(
Iterate
));
TFileFstIter
*
tIter
=
calloc
(
1
,
sizeof
(
TFileFstIter
));
if
(
tIter
==
NULL
)
{
return
NULL
;
}
tIter
->
ctx
=
automCtxCreate
(
NULL
,
AUTOMATION_ALWAYS
);
...
...
@@ -435,6 +435,7 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
iter
->
next
=
tfileIteratorNext
;
iter
->
getValue
=
tifileIterateGetValue
;
iter
->
val
.
val
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
iter
->
val
.
colVal
=
NULL
;
return
iter
;
}
void
tfileIteratorDestroy
(
Iterate
*
iter
)
{
...
...
@@ -447,13 +448,14 @@ void tfileIteratorDestroy(Iterate* iter) {
streamWithStateDestroy
(
tIter
->
st
);
fstStreamBuilderDestroy
(
tIter
->
fb
);
automCtxDestroy
(
tIter
->
ctx
);
free
(
tIter
);
free
(
iter
);
}
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
char
*
colName
)
{
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
uint64_t
suid
,
char
*
colName
)
{
if
(
tf
==
NULL
)
{
return
NULL
;
}
TFileCacheKey
key
=
{.
suid
=
0
,
.
colType
=
TSDB_DATA_TYPE_BINARY
,
.
colName
=
colName
,
.
nColName
=
strlen
(
colName
)};
ICacheKey
key
=
{.
suid
=
suid
,
.
colType
=
TSDB_DATA_TYPE_BINARY
,
.
colName
=
colName
,
.
nColName
=
strlen
(
colName
)};
return
tfileCacheGet
(
tf
->
cache
,
&
key
);
}
...
...
@@ -480,7 +482,7 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
TFileValue
*
tfileValueCreate
(
char
*
val
)
{
TFileValue
*
tf
=
calloc
(
1
,
sizeof
(
TFileValue
));
if
(
tf
==
NULL
)
{
return
NULL
;
}
tf
->
colVal
=
val
;
tf
->
colVal
=
tstrdup
(
val
)
;
tf
->
tableId
=
taosArrayInit
(
32
,
sizeof
(
uint64_t
));
return
tf
;
}
...
...
@@ -491,6 +493,7 @@ int tfileValuePush(TFileValue* tf, uint64_t val) {
}
void
tfileValueDestroy
(
TFileValue
*
tf
)
{
taosArrayDestroy
(
tf
->
tableId
);
free
(
tf
->
colVal
);
free
(
tf
);
}
static
void
tfileSerialTableIdsToBuf
(
char
*
buf
,
SArray
*
ids
)
{
...
...
@@ -545,6 +548,9 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
//
indexError
(
"actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
errno
,
reader
->
ctx
->
file
.
fd
,
reader
->
ctx
->
file
.
buf
);
}
else
{
indexError
(
"actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
errno
,
reader
->
ctx
->
file
.
fd
,
reader
->
ctx
->
file
.
buf
);
}
// assert(nread == sizeof(buf));
memcpy
(
&
reader
->
header
,
buf
,
sizeof
(
buf
));
...
...
@@ -553,13 +559,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
}
static
int
tfileReaderLoadFst
(
TFileReader
*
reader
)
{
// current load fst into memory, refactor it later
static
int
FST_MAX_SIZE
=
64
*
1024
;
static
int
FST_MAX_SIZE
=
64
*
1024
*
1024
;
char
*
buf
=
calloc
(
1
,
sizeof
(
char
)
*
FST_MAX_SIZE
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
WriterCtx
*
ctx
=
reader
->
ctx
;
int32_t
nread
=
ctx
->
readFrom
(
ctx
,
buf
,
FST_MAX_SIZE
,
reader
->
header
.
fstOffset
);
indexError
(
"nread = %d, and fst offset=%d, filename: %s "
,
nread
,
reader
->
header
.
fstOffset
,
ctx
->
file
.
buf
);
// we assuse fst size less than FST_MAX_SIZE
assert
(
nread
>
0
&&
nread
<
FST_MAX_SIZE
);
...
...
@@ -603,19 +610,26 @@ void tfileReaderUnRef(TFileReader* reader) {
}
}
static
int
tfileGetFileList
(
const
char
*
path
,
SArray
*
result
)
{
static
SArray
*
tfileGetFileList
(
const
char
*
path
)
{
SArray
*
files
=
taosArrayInit
(
4
,
sizeof
(
void
*
));
DIR
*
dir
=
opendir
(
path
);
if
(
NULL
==
dir
)
{
return
-
1
;
}
if
(
NULL
==
dir
)
{
return
NULL
;
}
struct
dirent
*
entry
;
while
((
entry
=
readdir
(
dir
))
!=
NULL
)
{
if
(
entry
->
d_type
&&
DT_DIR
)
{
continue
;
}
size_t
len
=
strlen
(
entry
->
d_name
);
char
*
buf
=
calloc
(
1
,
len
+
1
);
memcpy
(
buf
,
entry
->
d_name
,
len
);
taosArrayPush
(
result
,
&
buf
);
taosArrayPush
(
files
,
&
buf
);
}
closedir
(
dir
);
return
0
;
taosArraySort
(
files
,
tfileCompare
);
tfileRmExpireFile
(
files
);
return
files
;
}
static
int
tfileRmExpireFile
(
SArray
*
result
)
{
// TODO(yihao): remove expire tindex after restart
...
...
@@ -636,22 +650,21 @@ static int tfileCompare(const void* a, const void* b) {
if
(
ret
==
0
)
{
return
ret
;
}
return
ret
<
0
?
-
1
:
1
;
}
// tfile name suid-colId-version.tindex
static
void
tfileGenFileName
(
char
*
filename
,
uint64_t
suid
,
int
colId
,
int
version
)
{
sprintf
(
filename
,
"%"
PRIu64
"-%d-%d.tindex"
,
suid
,
colId
,
version
);
return
;
}
static
int
tfileParseFileName
(
const
char
*
filename
,
uint64_t
*
suid
,
int
*
colId
,
int
*
version
)
{
if
(
3
==
sscanf
(
filename
,
"%"
PRIu64
"-%d-%d.tindex"
,
suid
,
colId
,
version
))
{
static
int
tfileParseFileName
(
const
char
*
filename
,
uint64_t
*
suid
,
char
*
col
,
int
*
version
)
{
if
(
3
==
sscanf
(
filename
,
"%"
PRIu64
"-%[^-]-%d.tindex"
,
suid
,
col
,
version
))
{
// read suid & colid & version success
return
0
;
}
return
-
1
;
}
static
void
tfileSerialCacheKey
(
TFileCacheKey
*
key
,
char
*
buf
)
{
// SERIALIZE_MEM_TO_BUF(buf, key, suid);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF
(
buf
,
key
,
colName
,
key
->
nColName
);
// tfile name suid-colId-version.tindex
static
void
tfileGenFileName
(
char
*
filename
,
uint64_t
suid
,
const
char
*
col
,
int
version
)
{
sprintf
(
filename
,
"%"
PRIu64
"-%s-%d.tindex"
,
suid
,
col
,
version
);
return
;
}
static
void
tfileGenFileFullName
(
char
*
fullname
,
const
char
*
path
,
uint64_t
suid
,
const
char
*
col
,
int32_t
version
)
{
char
filename
[
128
]
=
{
0
};
tfileGenFileName
(
filename
,
suid
,
col
,
version
);
sprintf
(
fullname
,
"%s/%s"
,
path
,
filename
);
}
source/libs/index/test/fstTest.cc
浏览文件 @
144358f0
...
...
@@ -24,8 +24,13 @@ class FstWriter {
_b
=
fstBuilderCreate
(
_wc
,
0
);
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
// char buf[128] = {0};
// int len = 0;
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstBuilderInsert
(
_b
,
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
...
...
@@ -61,6 +66,11 @@ class FstReadMemory {
return
_fst
!=
NULL
;
}
bool
Get
(
const
std
::
string
&
key
,
uint64_t
*
val
)
{
// char buf[128] = {0};
// int len = 0;
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstGet
(
_fst
,
&
skey
,
val
);
fstSliceDestroy
(
&
skey
);
...
...
@@ -135,15 +145,109 @@ int Performance_fstWriteRecords(FstWriter* b) {
}
return
L
*
M
*
N
;
}
void
Performance_fstReadRecords
(
FstReadMemory
*
m
)
{
std
::
string
str
(
"aa"
);
for
(
int
i
=
0
;
i
<
M
;
i
++
)
{
str
[
0
]
=
'a'
+
i
;
str
.
resize
(
2
);
for
(
int
j
=
0
;
j
<
N
;
j
++
)
{
str
[
1
]
=
'a'
+
j
;
str
.
resize
(
2
);
for
(
int
k
=
0
;
k
<
L
;
k
++
)
{
str
.
push_back
(
'a'
);
uint64_t
val
,
cost
;
if
(
m
->
GetWithTimeCostUs
(
str
,
&
val
,
&
cost
))
{
printf
(
"succes to get kv(%s, %"
PRId64
"), cost: %"
PRId64
"
\n
"
,
str
.
c_str
(),
val
,
cost
);
}
else
{
printf
(
"failed to get key: %s
\n
"
,
str
.
c_str
());
}
}
}
}
}
void
checkMillonWriteAndReadOfFst
()
{
tfInit
();
FstWriter
*
fw
=
new
FstWriter
;
Performance_fstWriteRecords
(
fw
);
delete
fw
;
FstReadMemory
*
fr
=
new
FstReadMemory
(
1024
*
64
*
1024
);
if
(
fr
->
init
())
{
printf
(
"success to init fst read"
);
}
Performance_fstReadRecords
(
fr
);
tfCleanup
();
delete
fr
;
}
void
checkFstLongTerm
()
{
tfInit
();
FstWriter
*
fw
=
new
FstWriter
;
// Performance_fstWriteRecords(fw);
fw
->
Put
(
"A B"
,
1
);
fw
->
Put
(
"C"
,
2
);
fw
->
Put
(
"a"
,
3
);
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
()
==
false
)
{
std
::
cout
<<
"init readMemory failed"
<<
std
::
endl
;
delete
m
;
return
;
}
{
uint64_t
val
=
0
;
if
(
m
->
Get
(
"A B"
,
&
val
))
{
std
::
cout
<<
"success to Get: "
<<
val
<<
std
::
endl
;
}
else
{
std
::
cout
<<
"failed to Get:"
<<
val
<<
std
::
endl
;
}
}
{
uint64_t
val
=
0
;
if
(
m
->
Get
(
"C"
,
&
val
))
{
std
::
cout
<<
"success to Get: "
<<
val
<<
std
::
endl
;
}
else
{
std
::
cout
<<
"failed to Get:"
<<
val
<<
std
::
endl
;
}
}
{
uint64_t
val
=
0
;
if
(
m
->
Get
(
"a"
,
&
val
))
{
std
::
cout
<<
"success to Get: "
<<
val
<<
std
::
endl
;
}
else
{
std
::
cout
<<
"failed to Get:"
<<
val
<<
std
::
endl
;
}
}
// prefix search
// std::vector<uint64_t> result;
// AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS);
// m->Search(ctx, result);
// std::cout << "size: " << result.size() << std::endl;
// assert(result.size() == count);
// for (int i = 0; i < result.size(); i++) {
// assert(result[i] == i); // check result
//}
tfCleanup
();
// free(ctx);
// delete m;
}
void
checkFstCheckIterator
()
{
tfInit
();
FstWriter
*
fw
=
new
FstWriter
;
int64_t
s
=
taosGetTimestampUs
();
int
count
=
2
;
Performance_fstWriteRecords
(
fw
);
//
Performance_fstWriteRecords(fw);
int64_t
e
=
taosGetTimestampUs
();
std
::
cout
<<
"insert data count : "
<<
count
<<
"elapas time: "
<<
e
-
s
<<
std
::
endl
;
fw
->
Put
(
"Hello world"
,
1
);
fw
->
Put
(
"hello world"
,
2
);
fw
->
Put
(
"hello worle"
,
3
);
fw
->
Put
(
"hello worlf"
,
4
);
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
...
...
@@ -171,7 +275,7 @@ void checkFstCheckIterator() {
void
fst_get
(
Fst
*
fst
)
{
for
(
int
i
=
0
;
i
<
10000
;
i
++
)
{
std
::
string
term
=
"Hello"
;
std
::
string
term
=
"Hello
World
"
;
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)
term
.
c_str
(),
term
.
size
());
uint64_t
offset
=
0
;
bool
ret
=
fstGet
(
fst
,
&
key
,
&
offset
);
...
...
@@ -189,7 +293,7 @@ void validateTFile(char* arg) {
std
::
thread
threads
[
NUM_OF_THREAD
];
// std::vector<std::thread> threads;
TFileReader
*
reader
=
tfileReaderOpen
(
arg
,
0
,
295868
,
"tag1"
);
TFileReader
*
reader
=
tfileReaderOpen
(
arg
,
0
,
999992
,
"tag1"
);
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
threads
[
i
]
=
std
::
thread
(
fst_get
,
reader
->
fst
);
...
...
@@ -203,9 +307,12 @@ void validateTFile(char* arg) {
tfCleanup
();
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
>
1
)
{
validateTFile
(
argv
[
1
]);
}
// tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); }
// checkFstCheckIterator();
// checkFstLongTerm();
// checkFstPrefixSearch();
checkMillonWriteAndReadOfFst
();
return
1
;
}
source/libs/index/test/indexTests.cc
浏览文件 @
144358f0
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
...
...
@@ -457,7 +458,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
// taosArrayPush(data, &v4);
fObj
->
Put
(
data
);
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
data
);
i
++
)
{
destroyTFileValue
(
taosArrayGetP
(
data
,
i
));
}
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
data
);
i
++
)
{
// data
destroyTFileValue
(
taosArrayGetP
(
data
,
i
));
}
taosArrayDestroy
(
data
);
std
::
string
colName
(
"voltage"
);
...
...
@@ -470,6 +474,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
fObj
->
Get
(
&
query
,
result
);
assert
(
taosArrayGetSize
(
result
)
==
200
);
indexTermDestroy
(
term
);
taosArrayDestroy
(
result
);
// tfileWriterDestroy(twrite);
}
...
...
@@ -477,7 +482,7 @@ class CacheObj {
public:
CacheObj
()
{
// TODO
cache
=
indexCacheCreate
(
NULL
,
"voltage"
,
TSDB_DATA_TYPE_BINARY
);
cache
=
indexCacheCreate
(
NULL
,
0
,
"voltage"
,
TSDB_DATA_TYPE_BINARY
);
}
int
Put
(
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
)
{
int
ret
=
indexCachePut
(
cache
,
term
,
uid
);
...
...
@@ -534,6 +539,7 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
indexTermDestroy
(
term
);
// indexTermDestry(term);
}
{
...
...
@@ -541,24 +547,28 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
indexTermDestroy
(
term
);
}
{
std
::
string
colVal
(
"v2"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
indexTermDestroy
(
term
);
}
{
std
::
string
colVal
(
"v3"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
indexTermDestroy
(
term
);
}
{
std
::
string
colVal
(
"v3"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
indexTermDestroy
(
term
);
}
coj
->
Debug
();
std
::
cout
<<
"--------first----------"
<<
std
::
endl
;
...
...
@@ -567,12 +577,14 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
othColId
,
version
++
,
suid
++
);
indexTermDestroy
(
term
);
}
{
std
::
string
colVal
(
"v4"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
othColId
,
version
++
,
suid
++
);
indexTermDestroy
(
term
);
}
coj
->
Debug
();
std
::
cout
<<
"--------second----------"
<<
std
::
endl
;
...
...
@@ -583,6 +595,7 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
indexTermDestroy
(
term
);
}
}
coj
->
Debug
();
...
...
@@ -598,6 +611,9 @@ TEST_F(IndexCacheEnv, cache_test) {
coj
->
Get
(
&
query
,
colId
,
10000
,
ret
,
&
valType
);
std
::
cout
<<
"size : "
<<
taosArrayGetSize
(
ret
)
<<
std
::
endl
;
assert
(
taosArrayGetSize
(
ret
)
==
4
);
taosArrayDestroy
(
ret
);
indexTermDestroy
(
term
);
}
{
std
::
string
colVal
(
"v2"
);
...
...
@@ -609,6 +625,9 @@ TEST_F(IndexCacheEnv, cache_test) {
coj
->
Get
(
&
query
,
colId
,
10000
,
ret
,
&
valType
);
assert
(
taosArrayGetSize
(
ret
)
==
1
);
taosArrayDestroy
(
ret
);
indexTermDestroy
(
term
);
}
}
class
IndexObj
{
...
...
@@ -620,7 +639,7 @@ class IndexObj {
indexInit
();
}
int
Init
(
const
std
::
string
&
dir
)
{
taosRemoveDir
(
dir
.
c_str
());
//
taosRemoveDir(dir.c_str());
taosMkDir
(
dir
.
c_str
());
int
ret
=
indexOpen
(
&
opts
,
dir
.
c_str
(),
&
idx
);
if
(
ret
!=
0
)
{
...
...
@@ -645,10 +664,11 @@ class IndexObj {
int
WriteMultiMillonData
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
=
"Hello world"
,
size_t
numOfTable
=
100
*
10000
)
{
std
::
string
tColVal
=
colVal
;
size_t
colValSize
=
tColVal
.
size
();
for
(
int
i
=
0
;
i
<
numOfTable
;
i
++
)
{
tColVal
[
tColVal
.
size
()
-
1
]
=
'a'
+
i
%
26
;
tColVal
[
i
%
colValSize
]
=
'a'
+
i
%
26
;
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
c
olVal
.
size
());
tColVal
.
c_str
(),
tC
olVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
10
;
i
++
)
{
...
...
@@ -677,14 +697,23 @@ class IndexObj {
indexMultiTermQueryAdd
(
mq
,
term
,
QUERY_TERM
);
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
if
(
Search
(
mq
,
result
)
==
0
)
{
std
::
cout
<<
"search one successfully"
<<
std
::
endl
;
}
return
taosArrayGetSize
(
result
);
int64_t
s
=
taosGetTimestampUs
();
if
(
Search
(
mq
,
result
)
==
0
)
{
int64_t
e
=
taosGetTimestampUs
();
std
::
cout
<<
"search one successfully and time cost:"
<<
e
-
s
<<
std
::
endl
;
}
else
{
}
int
sz
=
taosArrayGetSize
(
result
);
indexMultiTermQueryDestroy
(
mq
);
taosArrayDestroy
(
result
);
return
sz
;
// assert(taosArrayGetSize(result) == targetSize);
}
void
PutOne
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
)
{
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
Put
(
terms
,
10
);
indexMultiTermDestroy
(
terms
);
...
...
@@ -783,18 +812,21 @@ TEST_F(IndexEnv2, testIndexOpen) {
index
->
Search
(
mq
,
result
);
std
::
cout
<<
"target size: "
<<
taosArrayGetSize
(
result
)
<<
std
::
endl
;
assert
(
taosArrayGetSize
(
result
)
==
400
);
taosArrayDestroy
(
result
);
indexMultiTermQueryDestroy
(
mq
);
}
}
TEST_F
(
IndexEnv2
,
testIndex_TrigeFlush
)
{
std
::
string
path
=
"/tmp/test"
;
std
::
string
path
=
"/tmp/test
xxx
"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
// r
std
::
cout
<<
"failed to init"
<<
std
::
endl
;
}
int
numOfTable
=
100
*
10000
;
index
->
WriteMillonData
(
"tag1"
,
"Hello"
,
numOfTable
);
int
target
=
index
->
SearchOne
(
"tag1"
,
"Hello"
);
index
->
WriteMillonData
(
"tag1"
,
"Hello Wolrd"
,
numOfTable
);
int
target
=
index
->
SearchOne
(
"tag1"
,
"Hello Wolrd"
);
std
::
cout
<<
"Get Index: "
<<
target
<<
std
::
endl
;
assert
(
numOfTable
==
target
);
}
...
...
@@ -802,6 +834,10 @@ static void write_and_search(IndexObj* idx) {
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
int
target
=
idx
->
SearchOne
(
"tag1"
,
"Hello"
);
std
::
cout
<<
"search: "
<<
target
<<
std
::
endl
;
target
=
idx
->
SearchOne
(
"tag2"
,
"Test"
);
std
::
cout
<<
"search: "
<<
target
<<
std
::
endl
;
idx
->
PutOne
(
colName
,
colVal
);
}
TEST_F
(
IndexEnv2
,
testIndex_serarch_cache_and_tfile
)
{
...
...
@@ -809,7 +845,10 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
if
(
index
->
Init
(
path
)
!=
0
)
{
// opt
}
index
->
WriteMultiMillonData
(
"tag1"
,
"Hello"
,
200000
);
index
->
PutOne
(
"tag1"
,
"Hello"
);
index
->
PutOne
(
"tag2"
,
"Test"
);
index
->
WriteMultiMillonData
(
"tag1"
,
"Hello"
,
50
*
10000
);
index
->
WriteMultiMillonData
(
"tag2"
,
"Test"
,
50
*
10000
);
std
::
thread
threads
[
NUM_OF_THREAD
];
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
...
...
@@ -821,25 +860,17 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
threads
[
i
].
join
();
}
}
TEST_F
(
IndexEnv2
,
testIndex_multi_thread_write
)
{
std
::
string
path
=
"/tmp"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
}
TEST_F
(
IndexEnv2
,
testIndex_multi_thread_read
)
{
std
::
string
path
=
"/tmp"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
}
TEST_F
(
IndexEnv2
,
testIndex_restart
)
{
std
::
string
path
=
"/tmp"
;
std
::
string
path
=
"/tmp
/test1
"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
}
TEST_F
(
IndexEnv2
,
testIndex_performance
)
{
std
::
string
path
=
"/tmp"
;
std
::
string
path
=
"/tmp
/test2
"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
}
TEST_F
(
IndexEnv2
,
testIndexMultiTag
)
{
std
::
string
path
=
"/tmp"
;
std
::
string
path
=
"/tmp
/test3
"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
}
source/libs/parser/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util catalog function transport qcom
)
ADD_SUBDIRECTORY
(
test
)
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
source/libs/planner/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util catalog cjson parser transport function qcom
)
ADD_SUBDIRECTORY
(
test
)
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
source/libs/qcom/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -7,8 +7,10 @@ target_include_directories(
)
target_link_libraries
(
qcom
PRIVATE os util transport
qcom
PRIVATE os util transport
)
ADD_SUBDIRECTORY
(
test
)
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
source/libs/qworker/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -11,4 +11,6 @@ target_link_libraries(
PRIVATE os util transport planner qcom
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
\ No newline at end of file
source/libs/scheduler/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -12,4 +12,6 @@ target_link_libraries(
PRIVATE os util planner qcom common catalog transport
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
\ No newline at end of file
source/util/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -14,4 +14,7 @@ target_link_libraries(
PUBLIC api
)
ADD_SUBDIRECTORY
(
test
)
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
source/util/src/encode.c
0 → 100644
浏览文件 @
144358f0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "encode.h"
#if __STDC_VERSION__ >= 201112L
static_assert
(
sizeof
(
float
)
==
sizeof
(
uint32_t
),
"sizeof(float) must equal to sizeof(uint32_t)"
);
static_assert
(
sizeof
(
double
)
==
sizeof
(
uint64_t
),
"sizeof(double) must equal to sizeof(uint64_t)"
);
#endif
void
tCoderInit
(
SCoder
*
pCoder
,
td_endian_t
endian
,
uint8_t
*
data
,
int32_t
size
,
td_coder_t
type
)
{
if
(
type
==
TD_ENCODER
)
{
if
(
data
==
NULL
)
size
=
0
;
}
else
{
ASSERT
(
data
&&
size
>
0
);
}
pCoder
->
type
=
type
;
pCoder
->
endian
=
endian
;
pCoder
->
data
=
data
;
pCoder
->
size
=
size
;
pCoder
->
pos
=
0
;
tFreeListInit
(
&
(
pCoder
->
fl
));
TD_SLIST_INIT
(
&
(
pCoder
->
stack
));
}
void
tCoderClear
(
SCoder
*
pCoder
)
{
tFreeListClear
(
&
(
pCoder
->
fl
));
struct
SCoderNode
*
pNode
;
for
(;;)
{
pNode
=
TD_SLIST_HEAD
(
&
(
pCoder
->
stack
));
if
(
pNode
==
NULL
)
break
;
TD_SLIST_POP
(
&
(
pCoder
->
stack
));
free
(
pNode
);
}
}
int
tStartEncode
(
SCoder
*
pCoder
)
{
struct
SCoderNode
*
pNode
;
ASSERT
(
pCoder
->
type
==
TD_ENCODER
);
if
(
pCoder
->
data
)
{
if
(
pCoder
->
size
-
pCoder
->
pos
<
sizeof
(
int32_t
))
return
-
1
;
pNode
=
malloc
(
sizeof
(
*
pNode
));
if
(
pNode
==
NULL
)
return
-
1
;
pNode
->
data
=
pCoder
->
data
;
pNode
->
pos
=
pCoder
->
pos
;
pNode
->
size
=
pCoder
->
size
;
pCoder
->
data
=
pNode
->
data
+
pNode
->
pos
+
sizeof
(
int32_t
);
pCoder
->
pos
=
0
;
pCoder
->
size
=
pNode
->
size
-
pNode
->
pos
-
sizeof
(
int32_t
);
TD_SLIST_PUSH
(
&
(
pCoder
->
stack
),
pNode
);
}
else
{
pCoder
->
pos
+=
sizeof
(
int32_t
);
}
return
0
;
}
void
tEndEncode
(
SCoder
*
pCoder
)
{
struct
SCoderNode
*
pNode
;
int32_t
len
;
ASSERT
(
pCoder
->
type
==
TD_ENCODER
);
if
(
pCoder
->
data
)
{
pNode
=
TD_SLIST_HEAD
(
&
(
pCoder
->
stack
));
ASSERT
(
pNode
);
TD_SLIST_POP
(
&
(
pCoder
->
stack
));
len
=
pCoder
->
pos
;
pCoder
->
data
=
pNode
->
data
;
pCoder
->
size
=
pNode
->
size
;
pCoder
->
pos
=
pNode
->
pos
;
if
(
TD_RT_ENDIAN
()
==
pCoder
->
endian
)
{
tPut
(
int32_t
,
pCoder
->
data
+
pCoder
->
pos
,
len
);
}
else
{
tRPut32
(
pCoder
->
data
+
pCoder
->
pos
,
len
);
}
TD_CODER_MOVE_POS
(
pCoder
,
len
+
sizeof
(
int32_t
));
free
(
pNode
);
}
}
int
tStartDecode
(
SCoder
*
pCoder
)
{
int32_t
len
;
struct
SCoderNode
*
pNode
;
ASSERT
(
pCoder
->
type
==
TD_DECODER
);
if
(
tDecodeI32
(
pCoder
,
&
len
)
<
0
)
return
-
1
;
pNode
=
malloc
(
sizeof
(
*
pNode
));
if
(
pNode
==
NULL
)
return
-
1
;
pNode
->
data
=
pCoder
->
data
;
pNode
->
pos
=
pCoder
->
pos
;
pNode
->
size
=
pCoder
->
size
;
pCoder
->
data
=
pNode
->
data
+
pNode
->
pos
;
pCoder
->
size
=
len
;
pCoder
->
pos
=
0
;
TD_SLIST_PUSH
(
&
(
pCoder
->
stack
),
pNode
);
return
0
;
}
void
tEndDecode
(
SCoder
*
pCoder
)
{
struct
SCoderNode
*
pNode
;
ASSERT
(
pCoder
->
type
==
TD_DECODER
);
ASSERT
(
tDecodeIsEnd
(
pCoder
));
pNode
=
TD_SLIST_HEAD
(
&
(
pCoder
->
stack
));
ASSERT
(
pNode
);
TD_SLIST_POP
(
&
(
pCoder
->
stack
));
pCoder
->
data
=
pNode
->
data
;
pCoder
->
size
=
pNode
->
size
;
pCoder
->
pos
=
pCoder
->
pos
+
pNode
->
pos
;
free
(
pNode
);
}
source/util/test/CMakeLists.txt
浏览文件 @
144358f0
...
...
@@ -41,4 +41,8 @@ target_sources(freelistTest
)
target_link_libraries
(
freelistTest os util gtest gtest_main
)
# encodeTest
add_executable
(
encodeTest
"encodeTest.cpp"
)
target_link_libraries
(
encodeTest os util gtest gtest_main
)
source/util/test/encodeTest.cpp
0 → 100644
浏览文件 @
144358f0
#include <iostream>
#include "gtest/gtest.h"
#include "encode.h"
#define BUF_SIZE 64
td_endian_t
endian_arr
[
2
]
=
{
TD_LITTLE_ENDIAN
,
TD_BIG_ENDIAN
};
static
int
encode
(
SCoder
*
pCoder
,
int8_t
val
)
{
return
tEncodeI8
(
pCoder
,
val
);
}
static
int
encode
(
SCoder
*
pCoder
,
uint8_t
val
)
{
return
tEncodeU8
(
pCoder
,
val
);
}
static
int
encode
(
SCoder
*
pCoder
,
int16_t
val
)
{
return
tEncodeI16
(
pCoder
,
val
);
}
static
int
encode
(
SCoder
*
pCoder
,
uint16_t
val
)
{
return
tEncodeU16
(
pCoder
,
val
);
}
static
int
encode
(
SCoder
*
pCoder
,
int32_t
val
)
{
return
tEncodeI32
(
pCoder
,
val
);
}
static
int
encode
(
SCoder
*
pCoder
,
uint32_t
val
)
{
return
tEncodeU32
(
pCoder
,
val
);
}
static
int
encode
(
SCoder
*
pCoder
,
int64_t
val
)
{
return
tEncodeI64
(
pCoder
,
val
);
}
static
int
encode
(
SCoder
*
pCoder
,
uint64_t
val
)
{
return
tEncodeU64
(
pCoder
,
val
);
}
static
int
decode
(
SCoder
*
pCoder
,
int8_t
*
val
)
{
return
tDecodeI8
(
pCoder
,
val
);
}
static
int
decode
(
SCoder
*
pCoder
,
uint8_t
*
val
)
{
return
tDecodeU8
(
pCoder
,
val
);
}
static
int
decode
(
SCoder
*
pCoder
,
int16_t
*
val
)
{
return
tDecodeI16
(
pCoder
,
val
);
}
static
int
decode
(
SCoder
*
pCoder
,
uint16_t
*
val
)
{
return
tDecodeU16
(
pCoder
,
val
);
}
static
int
decode
(
SCoder
*
pCoder
,
int32_t
*
val
)
{
return
tDecodeI32
(
pCoder
,
val
);
}
static
int
decode
(
SCoder
*
pCoder
,
uint32_t
*
val
)
{
return
tDecodeU32
(
pCoder
,
val
);
}
static
int
decode
(
SCoder
*
pCoder
,
int64_t
*
val
)
{
return
tDecodeI64
(
pCoder
,
val
);
}
static
int
decode
(
SCoder
*
pCoder
,
uint64_t
*
val
)
{
return
tDecodeU64
(
pCoder
,
val
);
}
static
int
encodev
(
SCoder
*
pCoder
,
int8_t
val
)
{
return
tEncodeI8
(
pCoder
,
val
);
}
static
int
encodev
(
SCoder
*
pCoder
,
uint8_t
val
)
{
return
tEncodeU8
(
pCoder
,
val
);
}
static
int
encodev
(
SCoder
*
pCoder
,
int16_t
val
)
{
return
tEncodeI16v
(
pCoder
,
val
);
}
static
int
encodev
(
SCoder
*
pCoder
,
uint16_t
val
)
{
return
tEncodeU16v
(
pCoder
,
val
);
}
static
int
encodev
(
SCoder
*
pCoder
,
int32_t
val
)
{
return
tEncodeI32v
(
pCoder
,
val
);
}
static
int
encodev
(
SCoder
*
pCoder
,
uint32_t
val
)
{
return
tEncodeU32v
(
pCoder
,
val
);
}
static
int
encodev
(
SCoder
*
pCoder
,
int64_t
val
)
{
return
tEncodeI64v
(
pCoder
,
val
);
}
static
int
encodev
(
SCoder
*
pCoder
,
uint64_t
val
)
{
return
tEncodeU64v
(
pCoder
,
val
);
}
static
int
decodev
(
SCoder
*
pCoder
,
int8_t
*
val
)
{
return
tDecodeI8
(
pCoder
,
val
);
}
static
int
decodev
(
SCoder
*
pCoder
,
uint8_t
*
val
)
{
return
tDecodeU8
(
pCoder
,
val
);
}
static
int
decodev
(
SCoder
*
pCoder
,
int16_t
*
val
)
{
return
tDecodeI16v
(
pCoder
,
val
);
}
static
int
decodev
(
SCoder
*
pCoder
,
uint16_t
*
val
)
{
return
tDecodeU16v
(
pCoder
,
val
);
}
static
int
decodev
(
SCoder
*
pCoder
,
int32_t
*
val
)
{
return
tDecodeI32v
(
pCoder
,
val
);
}
static
int
decodev
(
SCoder
*
pCoder
,
uint32_t
*
val
)
{
return
tDecodeU32v
(
pCoder
,
val
);
}
static
int
decodev
(
SCoder
*
pCoder
,
int64_t
*
val
)
{
return
tDecodeI64v
(
pCoder
,
val
);
}
static
int
decodev
(
SCoder
*
pCoder
,
uint64_t
*
val
)
{
return
tDecodeU64v
(
pCoder
,
val
);
}
template
<
typename
T
>
static
void
simple_encode_decode_func
(
bool
var_len
)
{
uint8_t
buf
[
BUF_SIZE
];
SCoder
coder
;
T
min_val
,
max_val
;
T
step
=
1
;
if
(
typeid
(
T
)
==
typeid
(
int8_t
))
{
min_val
=
INT8_MIN
;
max_val
=
INT8_MAX
;
step
=
1
;
}
else
if
(
typeid
(
T
)
==
typeid
(
uint8_t
))
{
min_val
=
0
;
max_val
=
UINT8_MAX
;
step
=
1
;
}
else
if
(
typeid
(
T
)
==
typeid
(
int16_t
))
{
min_val
=
INT16_MIN
;
max_val
=
INT16_MAX
;
step
=
1
;
}
else
if
(
typeid
(
T
)
==
typeid
(
uint16_t
))
{
min_val
=
0
;
max_val
=
UINT16_MAX
;
step
=
1
;
}
else
if
(
typeid
(
T
)
==
typeid
(
int32_t
))
{
min_val
=
INT32_MIN
;
max_val
=
INT32_MAX
;
step
=
((
T
)
1
)
<<
16
;
}
else
if
(
typeid
(
T
)
==
typeid
(
uint32_t
))
{
min_val
=
0
;
max_val
=
UINT32_MAX
;
step
=
((
T
)
1
)
<<
16
;
}
else
if
(
typeid
(
T
)
==
typeid
(
int64_t
))
{
min_val
=
INT64_MIN
;
max_val
=
INT64_MAX
;
step
=
((
T
)
1
)
<<
48
;
}
else
if
(
typeid
(
T
)
==
typeid
(
uint64_t
))
{
min_val
=
0
;
max_val
=
UINT64_MAX
;
step
=
((
T
)
1
)
<<
48
;
}
T
i
=
min_val
;
for
(;;
/*T i = min_val; i <= max_val; i += step*/
)
{
T
dval
;
// Encode NULL
for
(
td_endian_t
endian
:
endian_arr
)
{
tCoderInit
(
&
coder
,
endian
,
NULL
,
0
,
TD_ENCODER
);
if
(
var_len
)
{
GTEST_ASSERT_EQ
(
encodev
(
&
coder
,
i
),
0
);
}
else
{
GTEST_ASSERT_EQ
(
encode
(
&
coder
,
i
),
0
);
GTEST_ASSERT_EQ
(
coder
.
pos
,
sizeof
(
T
));
}
tCoderClear
(
&
coder
);
}
// Encode and decode
for
(
td_endian_t
e_endian
:
endian_arr
)
{
for
(
td_endian_t
d_endian
:
endian_arr
)
{
// Encode
tCoderInit
(
&
coder
,
e_endian
,
buf
,
BUF_SIZE
,
TD_ENCODER
);
if
(
var_len
)
{
GTEST_ASSERT_EQ
(
encodev
(
&
coder
,
i
),
0
);
}
else
{
GTEST_ASSERT_EQ
(
encode
(
&
coder
,
i
),
0
);
GTEST_ASSERT_EQ
(
coder
.
pos
,
sizeof
(
T
));
}
int32_t
epos
=
coder
.
pos
;
tCoderClear
(
&
coder
);
// Decode
tCoderInit
(
&
coder
,
d_endian
,
buf
,
BUF_SIZE
,
TD_DECODER
);
if
(
var_len
)
{
GTEST_ASSERT_EQ
(
decodev
(
&
coder
,
&
dval
),
0
);
}
else
{
GTEST_ASSERT_EQ
(
decode
(
&
coder
,
&
dval
),
0
);
GTEST_ASSERT_EQ
(
coder
.
pos
,
sizeof
(
T
));
}
GTEST_ASSERT_EQ
(
coder
.
pos
,
epos
);
if
(
typeid
(
T
)
==
typeid
(
int8_t
)
||
typeid
(
T
)
==
typeid
(
uint8_t
)
||
e_endian
==
d_endian
)
{
GTEST_ASSERT_EQ
(
i
,
dval
);
}
tCoderClear
(
&
coder
);
}
}
if
(
i
==
max_val
)
break
;
if
(
max_val
-
i
<
step
)
{
i
=
max_val
;
}
else
{
i
=
i
+
step
;
}
}
}
TEST
(
td_encode_test
,
encode_decode_fixed_len_integer
)
{
simple_encode_decode_func
<
int8_t
>
(
false
);
simple_encode_decode_func
<
uint8_t
>
(
false
);
simple_encode_decode_func
<
int16_t
>
(
false
);
simple_encode_decode_func
<
uint16_t
>
(
false
);
simple_encode_decode_func
<
int32_t
>
(
false
);
simple_encode_decode_func
<
uint32_t
>
(
false
);
simple_encode_decode_func
<
int64_t
>
(
false
);
simple_encode_decode_func
<
uint64_t
>
(
false
);
}
TEST
(
td_encode_test
,
encode_decode_variant_len_integer
)
{
simple_encode_decode_func
<
int16_t
>
(
true
);
simple_encode_decode_func
<
uint16_t
>
(
true
);
simple_encode_decode_func
<
int32_t
>
(
true
);
simple_encode_decode_func
<
uint32_t
>
(
true
);
simple_encode_decode_func
<
int64_t
>
(
true
);
simple_encode_decode_func
<
uint64_t
>
(
true
);
}
TEST
(
td_encode_test
,
encode_decode_cstr
)
{
uint8_t
*
buf
=
new
uint8_t
[
1024
*
1024
];
char
*
cstr
=
new
char
[
1024
*
1024
];
const
char
*
dcstr
;
SCoder
encoder
;
SCoder
decoder
;
for
(
size_t
i
=
0
;
i
<
1024
*
2
-
1
;
i
++
)
{
memset
(
cstr
,
'a'
,
i
);
cstr
[
i
]
=
'\0'
;
for
(
td_endian_t
endian
:
endian_arr
)
{
// Encode
tCoderInit
(
&
encoder
,
endian
,
buf
,
1024
*
1024
,
TD_ENCODER
);
GTEST_ASSERT_EQ
(
tEncodeCStr
(
&
encoder
,
cstr
),
0
);
tCoderClear
(
&
encoder
);
// Decode
tCoderInit
(
&
decoder
,
endian
,
buf
,
1024
*
1024
,
TD_DECODER
);
GTEST_ASSERT_EQ
(
tDecodeCStr
(
&
decoder
,
&
dcstr
),
0
);
GTEST_ASSERT_EQ
(
memcmp
(
dcstr
,
cstr
,
i
+
1
),
0
);
tCoderClear
(
&
decoder
);
}
}
delete
buf
;
delete
cstr
;
}
typedef
struct
{
int32_t
A_a
;
int64_t
A_b
;
char
*
A_c
;
}
SStructA_v1
;
static
int
tSStructA_v1_encode
(
SCoder
*
pCoder
,
const
SStructA_v1
*
pSAV1
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pSAV1
->
A_a
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pSAV1
->
A_b
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pSAV1
->
A_c
)
<
0
)
return
-
1
;
tEndEncode
(
pCoder
);
return
0
;
}
static
int
tSStructA_v1_decode
(
SCoder
*
pCoder
,
SStructA_v1
*
pSAV1
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSAV1
->
A_a
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSAV1
->
A_b
)
<
0
)
return
-
1
;
const
char
*
tstr
;
uint64_t
len
;
if
(
tDecodeCStrAndLen
(
pCoder
,
&
tstr
,
&
len
)
<
0
)
return
-
1
;
pSAV1
->
A_c
=
(
char
*
)
TCODER_MALLOC
(
len
+
1
,
pCoder
);
memcpy
(
pSAV1
->
A_c
,
tstr
,
len
+
1
);
tEndDecode
(
pCoder
);
return
0
;
}
typedef
struct
{
int32_t
A_a
;
int64_t
A_b
;
char
*
A_c
;
// -------------------BELOW FEILDS ARE ADDED IN A NEW VERSION--------------
int16_t
A_d
;
int16_t
A_e
;
}
SStructA_v2
;
static
int
tSStructA_v2_encode
(
SCoder
*
pCoder
,
const
SStructA_v2
*
pSAV2
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pSAV2
->
A_a
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pSAV2
->
A_b
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pSAV2
->
A_c
)
<
0
)
return
-
1
;
// ------------------------NEW FIELDS ENCODE-------------------------------
if
(
tEncodeI16
(
pCoder
,
pSAV2
->
A_d
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pCoder
,
pSAV2
->
A_e
)
<
0
)
return
-
1
;
tEndEncode
(
pCoder
);
return
0
;
}
static
int
tSStructA_v2_decode
(
SCoder
*
pCoder
,
SStructA_v2
*
pSAV2
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSAV2
->
A_a
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSAV2
->
A_b
)
<
0
)
return
-
1
;
const
char
*
tstr
;
uint64_t
len
;
if
(
tDecodeCStrAndLen
(
pCoder
,
&
tstr
,
&
len
)
<
0
)
return
-
1
;
pSAV2
->
A_c
=
(
char
*
)
TCODER_MALLOC
(
len
+
1
,
pCoder
);
memcpy
(
pSAV2
->
A_c
,
tstr
,
len
+
1
);
// ------------------------NEW FIELDS DECODE-------------------------------
if
(
!
tDecodeIsEnd
(
pCoder
))
{
if
(
tDecodeI16
(
pCoder
,
&
pSAV2
->
A_d
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pCoder
,
&
pSAV2
->
A_e
)
<
0
)
return
-
1
;
}
else
{
pSAV2
->
A_d
=
0
;
pSAV2
->
A_e
=
0
;
}
tEndDecode
(
pCoder
);
return
0
;
}
typedef
struct
{
SStructA_v1
*
pA
;
int32_t
v_a
;
int8_t
v_b
;
}
SFinalReq_v1
;
static
int
tSFinalReq_v1_encode
(
SCoder
*
pCoder
,
const
SFinalReq_v1
*
ps1
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tSStructA_v1_encode
(
pCoder
,
ps1
->
pA
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
ps1
->
v_a
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
ps1
->
v_b
)
<
0
)
return
-
1
;
tEndEncode
(
pCoder
);
return
0
;
}
static
int
tSFinalReq_v1_decode
(
SCoder
*
pCoder
,
SFinalReq_v1
*
ps1
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
ps1
->
pA
=
(
SStructA_v1
*
)
TCODER_MALLOC
(
sizeof
(
*
(
ps1
->
pA
)),
pCoder
);
if
(
tSStructA_v1_decode
(
pCoder
,
ps1
->
pA
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
ps1
->
v_a
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
ps1
->
v_b
)
<
0
)
return
-
1
;
tEndDecode
(
pCoder
);
return
0
;
}
typedef
struct
{
SStructA_v2
*
pA
;
int32_t
v_a
;
int8_t
v_b
;
// ----------------------- Feilds added -----------------------
int16_t
v_c
;
}
SFinalReq_v2
;
static
int
tSFinalReq_v2_encode
(
SCoder
*
pCoder
,
const
SFinalReq_v2
*
ps2
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tSStructA_v2_encode
(
pCoder
,
ps2
->
pA
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
ps2
->
v_a
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
ps2
->
v_b
)
<
0
)
return
-
1
;
// ----------------------- Feilds added encode -----------------------
if
(
tEncodeI16
(
pCoder
,
ps2
->
v_c
)
<
0
)
return
-
1
;
tEndEncode
(
pCoder
);
return
0
;
}
static
int
tSFinalReq_v2_decode
(
SCoder
*
pCoder
,
SFinalReq_v2
*
ps2
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
ps2
->
pA
=
(
SStructA_v2
*
)
TCODER_MALLOC
(
sizeof
(
*
(
ps2
->
pA
)),
pCoder
);
if
(
tSStructA_v2_decode
(
pCoder
,
ps2
->
pA
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
ps2
->
v_a
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
ps2
->
v_b
)
<
0
)
return
-
1
;
// ----------------------- Feilds added decode -----------------------
if
(
tDecodeIsEnd
(
pCoder
))
{
ps2
->
v_c
=
0
;
}
else
{
if
(
tDecodeI16
(
pCoder
,
&
ps2
->
v_c
)
<
0
)
return
-
1
;
}
tEndDecode
(
pCoder
);
return
0
;
}
TEST
(
td_encode_test
,
compound_struct_encode_test
)
{
SCoder
encoder
,
decoder
;
uint8_t
*
buf1
;
int32_t
buf1size
;
uint8_t
*
buf2
;
int32_t
buf2size
;
SStructA_v1
sa1
=
{.
A_a
=
10
,
.
A_b
=
65478
,
.
A_c
=
"Hello"
};
SStructA_v2
sa2
=
{.
A_a
=
10
,
.
A_b
=
65478
,
.
A_c
=
"Hello"
,
.
A_d
=
67
,
.
A_e
=
13
};
SFinalReq_v1
req1
=
{.
pA
=
&
sa1
,
.
v_a
=
15
,
.
v_b
=
35
};
SFinalReq_v2
req2
=
{.
pA
=
&
sa2
,
.
v_a
=
15
,
.
v_b
=
32
,
.
v_c
=
37
};
SFinalReq_v1
dreq1
;
SFinalReq_v2
dreq21
,
dreq22
;
// Get size
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
nullptr
,
0
,
TD_ENCODER
);
GTEST_ASSERT_EQ
(
tSFinalReq_v1_encode
(
&
encoder
,
&
req1
),
0
);
buf1size
=
encoder
.
pos
;
buf1
=
new
uint8_t
[
encoder
.
pos
];
tCoderClear
(
&
encoder
);
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
nullptr
,
0
,
TD_ENCODER
);
GTEST_ASSERT_EQ
(
tSFinalReq_v2_encode
(
&
encoder
,
&
req2
),
0
);
buf2size
=
encoder
.
pos
;
buf2
=
new
uint8_t
[
encoder
.
pos
];
tCoderClear
(
&
encoder
);
// Encode
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf1
,
buf1size
,
TD_ENCODER
);
GTEST_ASSERT_EQ
(
tSFinalReq_v1_encode
(
&
encoder
,
&
req1
),
0
);
tCoderClear
(
&
encoder
);
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf2
,
buf2size
,
TD_ENCODER
);
GTEST_ASSERT_EQ
(
tSFinalReq_v2_encode
(
&
encoder
,
&
req2
),
0
);
tCoderClear
(
&
encoder
);
// Decode
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf1
,
buf1size
,
TD_DECODER
);
GTEST_ASSERT_EQ
(
tSFinalReq_v1_decode
(
&
decoder
,
&
dreq1
),
0
);
GTEST_ASSERT_EQ
(
dreq1
.
pA
->
A_a
,
req1
.
pA
->
A_a
);
GTEST_ASSERT_EQ
(
dreq1
.
pA
->
A_b
,
req1
.
pA
->
A_b
);
GTEST_ASSERT_EQ
(
strcmp
(
dreq1
.
pA
->
A_c
,
req1
.
pA
->
A_c
),
0
);
GTEST_ASSERT_EQ
(
dreq1
.
v_a
,
req1
.
v_a
);
GTEST_ASSERT_EQ
(
dreq1
.
v_b
,
req1
.
v_b
);
tCoderClear
(
&
decoder
);
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf1
,
buf1size
,
TD_DECODER
);
GTEST_ASSERT_EQ
(
tSFinalReq_v2_decode
(
&
decoder
,
&
dreq21
),
0
);
GTEST_ASSERT_EQ
(
dreq21
.
pA
->
A_a
,
req1
.
pA
->
A_a
);
GTEST_ASSERT_EQ
(
dreq21
.
pA
->
A_b
,
req1
.
pA
->
A_b
);
GTEST_ASSERT_EQ
(
strcmp
(
dreq21
.
pA
->
A_c
,
req1
.
pA
->
A_c
),
0
);
GTEST_ASSERT_EQ
(
dreq21
.
pA
->
A_d
,
0
);
GTEST_ASSERT_EQ
(
dreq21
.
pA
->
A_e
,
0
);
GTEST_ASSERT_EQ
(
dreq21
.
v_a
,
req1
.
v_a
);
GTEST_ASSERT_EQ
(
dreq21
.
v_b
,
req1
.
v_b
);
GTEST_ASSERT_EQ
(
dreq21
.
v_c
,
0
);
tCoderClear
(
&
decoder
);
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf2
,
buf2size
,
TD_DECODER
);
GTEST_ASSERT_EQ
(
tSFinalReq_v2_decode
(
&
decoder
,
&
dreq22
),
0
);
GTEST_ASSERT_EQ
(
dreq22
.
pA
->
A_a
,
req2
.
pA
->
A_a
);
GTEST_ASSERT_EQ
(
dreq22
.
pA
->
A_b
,
req2
.
pA
->
A_b
);
GTEST_ASSERT_EQ
(
strcmp
(
dreq22
.
pA
->
A_c
,
req2
.
pA
->
A_c
),
0
);
GTEST_ASSERT_EQ
(
dreq22
.
pA
->
A_d
,
req2
.
pA
->
A_d
);
GTEST_ASSERT_EQ
(
dreq22
.
pA
->
A_e
,
req2
.
pA
->
A_e
);
GTEST_ASSERT_EQ
(
dreq22
.
v_a
,
req2
.
v_a
);
GTEST_ASSERT_EQ
(
dreq22
.
v_b
,
req2
.
v_b
);
GTEST_ASSERT_EQ
(
dreq22
.
v_c
,
req2
.
v_c
);
tCoderClear
(
&
decoder
);
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录