Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
55676139
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
55676139
编写于
1月 05, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
merge develop
上级
c5ae0c9e
a84af557
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
133 addition
and
23 deletion
+133
-23
include/os/os.h
include/os/os.h
+6
-9
include/util/tfile.h
include/util/tfile.h
+1
-1
source/libs/index/inc/index_fst_counting_writer.h
source/libs/index/inc/index_fst_counting_writer.h
+5
-0
source/libs/index/src/index.c
source/libs/index/src/index.c
+2
-1
source/libs/index/src/index_fst_counting_writer.c
source/libs/index/src/index_fst_counting_writer.c
+19
-1
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+90
-11
source/util/src/tfile.c
source/util/src/tfile.c
+10
-0
未找到文件。
include/os/os.h
浏览文件 @
55676139
...
...
@@ -22,11 +22,13 @@ extern "C" {
#include <assert.h>
#include <ctype.h>
#include <dirent.h>
#include <errno.h>
#include <float.h>
#include <inttypes.h>
#include <locale.h>
#include <math.h>
#include <sched.h>
#include <setjmp.h>
#include <signal.h>
#include <stdarg.h>
...
...
@@ -36,19 +38,14 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <wctype.h>
#include <wchar.h>
#include <sched.h>
#include <ctype.h>
#include <errno.h>
#include <float.h>
#include <math.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/utsname.h>
#include <dirent.h>
#include <unistd.h>
#include <wchar.h>
#include <wctype.h>
#include <sys/mman.h>
#include "osAtomic.h"
#include "osDef.h"
...
...
include/util/tfile.h
浏览文件 @
55676139
...
...
@@ -43,7 +43,7 @@ int32_t tfFsync(int64_t tfd);
bool
tfValid
(
int64_t
tfd
);
int64_t
tfLseek
(
int64_t
tfd
,
int64_t
offset
,
int32_t
whence
);
int32_t
tfFtruncate
(
int64_t
tfd
,
int64_t
length
);
void
*
tfMmapReadOnly
(
int64_t
tfd
,
int64_t
length
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/index/inc/index_fst_counting_writer.h
浏览文件 @
55676139
...
...
@@ -22,6 +22,8 @@ extern "C" {
#include "tfile.h"
//#define USE_MMAP 1
#define DefaultMem 1024 * 1024
static
char
tmpFile
[]
=
"./index"
;
...
...
@@ -39,6 +41,9 @@ typedef struct WriterCtx {
bool
readOnly
;
char
buf
[
256
];
int
size
;
#ifdef USE_MMAP
char
*
ptr
;
#endif
}
file
;
struct
{
int32_t
capa
;
...
...
source/libs/index/src/index.c
浏览文件 @
55676139
...
...
@@ -401,7 +401,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
IndexCache
*
pCache
=
(
IndexCache
*
)
cache
;
TFileReader
*
pReader
=
tfileGetReaderByCol
(
sIdx
->
tindex
,
pCache
->
suid
,
pCache
->
colName
);
if
(
pReader
==
NULL
)
{
indexWarn
(
"empty
pR
eader found"
);
}
if
(
pReader
==
NULL
)
{
indexWarn
(
"empty
tfile r
eader found"
);
}
// handle flush
Iterate
*
cacheIter
=
indexCacheIteratorCreate
(
pCache
);
Iterate
*
tfileIter
=
tfileIteratorCreate
(
pReader
);
...
...
@@ -512,6 +512,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
return
ret
;
END:
tfileWriterClose
(
tw
);
return
-
1
;
}
int32_t
indexSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
)
{
...
...
source/libs/index/src/index_fst_counting_writer.c
浏览文件 @
55676139
...
...
@@ -31,7 +31,12 @@ static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
static
int
writeCtxDoRead
(
WriterCtx
*
ctx
,
uint8_t
*
buf
,
int
len
)
{
int
nRead
=
0
;
if
(
ctx
->
type
==
TFile
)
{
#ifdef USE_MMAP
nRead
=
len
<
ctx
->
file
.
size
?
len
:
ctx
->
file
.
size
;
memcpy
(
buf
,
ctx
->
file
.
ptr
,
nRead
);
#else
nRead
=
tfRead
(
ctx
->
file
.
fd
,
buf
,
len
);
#endif
}
else
{
memcpy
(
buf
,
ctx
->
mem
.
buf
+
ctx
->
offset
,
len
);
}
...
...
@@ -43,7 +48,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
int
nRead
=
0
;
if
(
ctx
->
type
==
TFile
)
{
// tfLseek(ctx->file.fd, offset, 0);
#ifdef USE_MMAP
int32_t
last
=
ctx
->
file
.
size
-
offset
;
nRead
=
last
>=
len
?
len
:
last
;
memcpy
(
buf
,
ctx
->
file
.
ptr
+
offset
,
nRead
);
#else
nRead
=
tfPread
(
ctx
->
file
.
fd
,
buf
,
len
,
offset
);
#endif
}
else
{
// refactor later
assert
(
0
);
...
...
@@ -83,6 +94,9 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
struct
stat
fstat
;
stat
(
path
,
&
fstat
);
ctx
->
file
.
size
=
fstat
.
st_size
;
#ifdef USE_MMAP
ctx
->
file
.
ptr
=
(
char
*
)
tfMmapReadOnly
(
ctx
->
file
.
fd
,
ctx
->
file
.
size
);
#endif
}
memcpy
(
ctx
->
file
.
buf
,
path
,
strlen
(
path
));
if
(
ctx
->
file
.
fd
<
0
)
{
...
...
@@ -111,8 +125,12 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
if
(
ctx
->
type
==
TMemory
)
{
free
(
ctx
->
mem
.
buf
);
}
else
{
// ctx->flush(ctx);
tfClose
(
ctx
->
file
.
fd
);
if
(
ctx
->
file
.
readOnly
)
{
#ifdef USE_MMAP
munmap
(
ctx
->
file
.
ptr
,
ctx
->
file
.
size
);
#endif
}
if
(
remove
)
{
unlink
(
ctx
->
file
.
buf
);
}
}
free
(
ctx
);
...
...
source/libs/index/test/indexTests.cc
浏览文件 @
55676139
...
...
@@ -28,7 +28,7 @@
#include "tutil.h"
using
namespace
std
;
#define NUM_OF_THREAD
10
#define NUM_OF_THREAD
5
class
DebugInfo
{
public:
...
...
@@ -679,6 +679,17 @@ class IndexObj {
}
return
numOfTable
;
}
int
ReadMultiMillonData
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
=
"Hello world"
,
size_t
numOfTable
=
100
*
10000
)
{
std
::
string
tColVal
=
colVal
;
int
colValSize
=
tColVal
.
size
();
for
(
int
i
=
0
;
i
<
numOfTable
;
i
++
)
{
tColVal
[
i
%
colValSize
]
=
'a'
+
i
%
26
;
SearchOne
(
colName
,
tColVal
);
}
return
0
;
}
int
Put
(
SIndexMultiTerm
*
fvs
,
uint64_t
uid
)
{
numOfWrite
+=
taosArrayGetSize
(
fvs
);
...
...
@@ -698,6 +709,27 @@ class IndexObj {
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
int64_t
s
=
taosGetTimestampUs
();
if
(
Search
(
mq
,
result
)
==
0
)
{
int64_t
e
=
taosGetTimestampUs
();
std
::
cout
<<
"search and time cost:"
<<
e
-
s
<<
"
\t
query col:"
<<
colName
<<
"
\t
val: "
<<
colVal
<<
"
\t
size:"
<<
taosArrayGetSize
(
result
)
<<
std
::
endl
;
}
else
{
}
int
sz
=
taosArrayGetSize
(
result
);
indexMultiTermQueryDestroy
(
mq
);
taosArrayDestroy
(
result
);
return
sz
;
// assert(taosArrayGetSize(result) == targetSize);
}
int
SearchOneTarget
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
,
uint64_t
val
)
{
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermQueryAdd
(
mq
,
term
,
QUERY_TERM
);
SArray
*
result
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
int64_t
s
=
taosGetTimestampUs
();
if
(
Search
(
mq
,
result
)
==
0
)
{
int64_t
e
=
taosGetTimestampUs
();
...
...
@@ -708,9 +740,13 @@ class IndexObj {
int
sz
=
taosArrayGetSize
(
result
);
indexMultiTermQueryDestroy
(
mq
);
taosArrayDestroy
(
result
);
assert
(
sz
==
1
);
uint64_t
*
ret
=
(
uint64_t
*
)
taosArrayGet
(
result
,
0
);
assert
(
val
=
*
ret
);
return
sz
;
// assert(taosArrayGetSize(result) == targetSize);
}
void
PutOne
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
)
{
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
...
...
@@ -719,6 +755,14 @@ class IndexObj {
Put
(
terms
,
10
);
indexMultiTermDestroy
(
terms
);
}
void
PutOneTarge
(
const
std
::
string
&
colName
,
const
std
::
string
&
colVal
,
uint64_t
val
)
{
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermAdd
(
terms
,
term
);
Put
(
terms
,
val
);
indexMultiTermDestroy
(
terms
);
}
void
Debug
()
{
std
::
cout
<<
"numOfWrite:"
<<
numOfWrite
<<
std
::
endl
;
std
::
cout
<<
"numOfRead:"
<<
numOfRead
<<
std
::
endl
;
...
...
@@ -831,12 +875,15 @@ TEST_F(IndexEnv2, testIndex_TrigeFlush) {
assert
(
numOfTable
==
target
);
}
static
void
write_and_search
(
IndexObj
*
idx
)
{
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello"
);
static
void
single_write_and_search
(
IndexObj
*
idx
)
{
int
target
=
idx
->
SearchOne
(
"tag1"
,
"Hello"
);
target
=
idx
->
SearchOne
(
"tag2"
,
"Test"
);
}
static
void
multi_write_and_search
(
IndexObj
*
idx
)
{
int
target
=
idx
->
SearchOne
(
"tag1"
,
"Hello"
);
target
=
idx
->
SearchOne
(
"tag2"
,
"Test"
);
// idx->PutOne(colName, colVal);
idx
->
WriteMultiMillonData
(
"tag1"
,
"Hello"
,
100
*
10000
);
idx
->
WriteMultiMillonData
(
"tag2"
,
"Test"
,
100
*
10000
);
}
TEST_F
(
IndexEnv2
,
testIndex_serarch_cache_and_tfile
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
...
...
@@ -851,7 +898,21 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
//
threads
[
i
]
=
std
::
thread
(
write_and_search
,
index
);
threads
[
i
]
=
std
::
thread
(
single_write_and_search
,
index
);
}
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
// TOD
threads
[
i
].
join
();
}
}
TEST_F
(
IndexEnv2
,
testIndex_MultiWrite_and_MultiRead
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
std
::
thread
threads
[
NUM_OF_THREAD
];
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
//
threads
[
i
]
=
std
::
thread
(
multi_write_and_search
,
index
);
}
for
(
int
i
=
0
;
i
<
NUM_OF_THREAD
;
i
++
)
{
// TOD
...
...
@@ -860,15 +921,33 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
}
TEST_F
(
IndexEnv2
,
testIndex_restart
)
{
std
::
string
path
=
"/tmp/
test1
"
;
std
::
string
path
=
"/tmp/
cache_and_tfile
"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
index
->
SearchOneTarget
(
"tag1"
,
"Hello"
,
10
);
index
->
SearchOneTarget
(
"tag2"
,
"Test"
,
10
);
}
TEST_F
(
IndexEnv2
,
testIndex_performance
)
{
std
::
string
path
=
"/tmp/
test2
"
;
TEST_F
(
IndexEnv2
,
testIndex_
read_
performance
)
{
std
::
string
path
=
"/tmp/
cache_and_tfile
"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
12
);
index
->
PutOneTarge
(
"tag1"
,
"Hello"
,
15
);
index
->
ReadMultiMillonData
(
"tag1"
,
"Hello"
);
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag1"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndexMultiTag
)
{
std
::
string
path
=
"/tmp/test3"
;
std
::
string
path
=
"/tmp/multi_tag"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
index
->
WriteMultiMillonData
(
"tag1"
,
"Hello"
,
100
*
10000
);
index
->
WriteMultiMillonData
(
"tag2"
,
"Test"
,
100
*
10000
);
index
->
WriteMultiMillonData
(
"tag3"
,
"Test"
,
100
*
10000
);
index
->
WriteMultiMillonData
(
"tag4"
,
"Test"
,
100
*
10000
);
}
TEST_F
(
IndexEnv2
,
testLongComVal
)
{
std
::
string
path
=
"/tmp/long_colVal"
;
if
(
index
->
Init
(
path
)
!=
0
)
{}
// gen colVal by randstr
std
::
string
randstr
=
"xxxxxxxxxxxxxxxxx"
;
index
->
WriteMultiMillonData
(
"tag1"
,
randstr
,
100
*
10000
);
}
source/util/src/tfile.c
浏览文件 @
55676139
...
...
@@ -158,3 +158,13 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) {
taosReleaseRef
(
tsFileRsetId
,
tfd
);
return
code
;
}
void
*
tfMmapReadOnly
(
int64_t
tfd
,
int64_t
length
)
{
void
*
p
=
taosAcquireRef
(
tsFileRsetId
,
tfd
);
if
(
p
==
NULL
)
return
NULL
;
int32_t
fd
=
(
int32_t
)(
uintptr_t
)
p
;
void
*
ptr
=
mmap
(
NULL
,
length
,
PROT_READ
,
MAP_SHARED
,
fd
,
0
);
taosReleaseRef
(
tsFileRsetId
,
tfd
);
return
ptr
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录