Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
c3671a59
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c3671a59
编写于
6月 23, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
6月 23, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2433 from taosdata/feature/2.0tsdb
Feature/2.0tsdb
上级
bc651191
90f44e04
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
125 addition
and
242 deletion
+125
-242
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+2
-2
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+62
-28
src/tsdb/tests/CMakeLists.txt
src/tsdb/tests/CMakeLists.txt
+1
-1
src/tsdb/tests/tsdbTests.cpp
src/tsdb/tests/tsdbTests.cpp
+50
-201
src/util/src/tkvstore.c
src/util/src/tkvstore.c
+10
-10
未找到文件。
src/tsdb/src/tsdbMain.c
浏览文件 @
c3671a59
...
@@ -179,7 +179,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
...
@@ -179,7 +179,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
return
-
1
;
return
-
1
;
}
}
}
}
pRsp
->
affectedRows
=
htonl
(
affectedrows
);
if
(
pRsp
!=
NULL
)
pRsp
->
affectedRows
=
htonl
(
affectedrows
);
return
0
;
return
0
;
}
}
...
@@ -648,7 +648,7 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
...
@@ -648,7 +648,7 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
}
}
pRepo
->
config
=
*
pCfg
;
pRepo
->
config
=
*
pCfg
;
pRepo
->
appH
=
*
pAppH
;
if
(
pAppH
)
pRepo
->
appH
=
*
pAppH
;
pRepo
->
tsdbMeta
=
tsdbNewMeta
(
pCfg
);
pRepo
->
tsdbMeta
=
tsdbNewMeta
(
pCfg
);
if
(
pRepo
->
tsdbMeta
==
NULL
)
{
if
(
pRepo
->
tsdbMeta
==
NULL
)
{
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
c3671a59
...
@@ -173,9 +173,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
...
@@ -173,9 +173,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
close
(
pHelper
->
files
.
nHeadF
.
fd
);
close
(
pHelper
->
files
.
nHeadF
.
fd
);
pHelper
->
files
.
nHeadF
.
fd
=
-
1
;
pHelper
->
files
.
nHeadF
.
fd
=
-
1
;
if
(
hasError
)
{
if
(
hasError
)
{
remove
(
pHelper
->
files
.
nHeadF
.
fname
);
(
void
)
remove
(
pHelper
->
files
.
nHeadF
.
fname
);
}
else
{
}
else
{
rename
(
pHelper
->
files
.
nHeadF
.
fname
,
pHelper
->
files
.
headF
.
fname
);
if
(
rename
(
pHelper
->
files
.
nHeadF
.
fname
,
pHelper
->
files
.
headF
.
fname
)
<
0
)
{
tsdbError
(
"vgId:%d failed to rename file from %s to %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
nHeadF
.
fname
,
pHelper
->
files
.
headF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pHelper
->
files
.
headF
.
info
=
pHelper
->
files
.
nHeadF
.
info
;
pHelper
->
files
.
headF
.
info
=
pHelper
->
files
.
nHeadF
.
info
;
}
}
}
}
...
@@ -186,9 +191,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
...
@@ -186,9 +191,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
close
(
pHelper
->
files
.
nLastF
.
fd
);
close
(
pHelper
->
files
.
nLastF
.
fd
);
pHelper
->
files
.
nLastF
.
fd
=
-
1
;
pHelper
->
files
.
nLastF
.
fd
=
-
1
;
if
(
hasError
)
{
if
(
hasError
)
{
remove
(
pHelper
->
files
.
nLastF
.
fname
);
(
void
)
remove
(
pHelper
->
files
.
nLastF
.
fname
);
}
else
{
}
else
{
rename
(
pHelper
->
files
.
nLastF
.
fname
,
pHelper
->
files
.
lastF
.
fname
);
if
(
rename
(
pHelper
->
files
.
nLastF
.
fname
,
pHelper
->
files
.
lastF
.
fname
)
<
0
)
{
tsdbError
(
"vgId:%d failed to rename file from %s to %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
nLastF
.
fname
,
pHelper
->
files
.
lastF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pHelper
->
files
.
lastF
.
info
=
pHelper
->
files
.
nLastF
.
info
;
pHelper
->
files
.
lastF
.
info
=
pHelper
->
files
.
nLastF
.
info
;
}
}
}
}
...
@@ -306,8 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
...
@@ -306,8 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
if
(
pCompBlock
->
numOfSubBlocks
>
1
)
{
if
(
pCompBlock
->
numOfSubBlocks
>
1
)
{
if
(
tsdbLoadBlockData
(
pHelper
,
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
),
NULL
)
<
0
)
return
-
1
;
if
(
tsdbLoadBlockData
(
pHelper
,
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
),
NULL
)
<
0
)
return
-
1
;
ASSERT
(
pHelper
->
pDataCols
[
0
]
->
numOfRows
>
0
&&
ASSERT
(
pHelper
->
pDataCols
[
0
]
->
numOfRows
>
0
&&
pHelper
->
pDataCols
[
0
]
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
);
pHelper
->
pDataCols
[
0
]
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
nLastF
),
pHelper
->
pDataCols
[
0
],
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
nLastF
),
pHelper
->
pDataCols
[
0
],
pHelper
->
pDataCols
[
0
]
->
numOfRows
,
&
compBlock
,
true
,
true
)
<
0
)
pHelper
->
pDataCols
[
0
]
->
numOfRows
,
&
compBlock
,
true
,
true
)
<
0
)
return
-
1
;
return
-
1
;
...
@@ -330,14 +339,27 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
...
@@ -330,14 +339,27 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
}
}
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
)
{
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
)
{
off_t
offset
=
0
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
if
(
pIdx
->
offset
>
0
)
{
if
(
pIdx
->
offset
>
0
)
{
pIdx
->
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
if
(
pIdx
->
offset
<
0
)
return
-
1
;
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseed file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pIdx
->
offset
=
offset
;
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
if
(
tsendfile
(
pHelper
->
files
.
nHeadF
.
fd
,
pHelper
->
files
.
headF
.
fd
,
NULL
,
pIdx
->
len
)
<
pIdx
->
len
)
return
-
1
;
if
(
tsendfile
(
pHelper
->
files
.
nHeadF
.
fd
,
pHelper
->
files
.
headF
.
fd
,
NULL
,
pIdx
->
len
)
<
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to send %d bytes from file %s to %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
pHelper
->
files
.
headF
.
fname
,
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
}
}
else
{
}
else
{
pHelper
->
pCompInfo
->
delimiter
=
TSDB_FILE_DELIMITER
;
pHelper
->
pCompInfo
->
delimiter
=
TSDB_FILE_DELIMITER
;
...
@@ -345,12 +367,23 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
...
@@ -345,12 +367,23 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pHelper
->
pCompInfo
->
checksum
=
0
;
pHelper
->
pCompInfo
->
checksum
=
0
;
ASSERT
((
pIdx
->
len
-
sizeof
(
SCompInfo
)
-
sizeof
(
TSCKSUM
))
%
sizeof
(
SCompBlock
)
==
0
);
ASSERT
((
pIdx
->
len
-
sizeof
(
SCompInfo
)
-
sizeof
(
TSCKSUM
))
%
sizeof
(
SCompBlock
)
==
0
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
pIdx
->
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pIdx
->
offset
=
offset
;
pIdx
->
uid
=
pHelper
->
tableInfo
.
uid
;
pIdx
->
uid
=
pHelper
->
tableInfo
.
uid
;
if
(
pIdx
->
offset
<
0
)
return
-
1
;
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
if
(
twrite
(
pHelper
->
files
.
nHeadF
.
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
return
-
1
;
if
(
twrite
(
pHelper
->
files
.
nHeadF
.
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
}
return
0
;
return
0
;
...
@@ -567,24 +600,24 @@ _err:
...
@@ -567,24 +600,24 @@ _err:
static
bool
tsdbShouldCreateNewLast
(
SRWHelper
*
pHelper
)
{
static
bool
tsdbShouldCreateNewLast
(
SRWHelper
*
pHelper
)
{
ASSERT
(
pHelper
->
files
.
lastF
.
fd
>
0
);
ASSERT
(
pHelper
->
files
.
lastF
.
fd
>
0
);
struct
stat
st
;
struct
stat
st
;
fstat
(
pHelper
->
files
.
lastF
.
fd
,
&
st
)
;
if
(
fstat
(
pHelper
->
files
.
lastF
.
fd
,
&
st
)
<
0
)
return
true
;
if
(
st
.
st_size
>
32
*
1024
+
TSDB_FILE_HEAD_SIZE
)
return
true
;
if
(
st
.
st_size
>
32
*
1024
+
TSDB_FILE_HEAD_SIZE
)
return
true
;
return
false
;
return
false
;
}
}
static
int
tsdbWriteBlockToFile
(
SRWHelper
*
pHelper
,
SFile
*
pFile
,
SDataCols
*
pDataCols
,
int
rowsToWrite
,
static
int
tsdbWriteBlockToFile
(
SRWHelper
*
pHelper
,
SFile
*
pFile
,
SDataCols
*
pDataCols
,
int
rowsToWrite
,
SCompBlock
*
pCompBlock
,
bool
isLast
,
bool
isSuperBlock
)
{
SCompBlock
*
pCompBlock
,
bool
isLast
,
bool
isSuperBlock
)
{
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
SCompData
*
pCompData
=
(
SCompData
*
)(
pHelper
->
pBuffer
);
SCompData
*
pCompData
=
(
SCompData
*
)(
pHelper
->
pBuffer
);
int64_t
offset
=
0
;
int64_t
offset
=
0
;
ASSERT
(
rowsToWrite
>
0
&&
rowsToWrite
<=
pDataCols
->
numOfRows
&&
rowsToWrite
<=
pCfg
->
maxRowsPerFileBlock
);
ASSERT
(
rowsToWrite
>
0
&&
rowsToWrite
<=
pDataCols
->
numOfRows
&&
rowsToWrite
<=
pCfg
->
maxRowsPerFileBlock
);
ASSERT
(
isLast
?
rowsToWrite
<
pCfg
->
minRowsPerFileBlock
:
true
);
ASSERT
(
isLast
?
rowsToWrite
<
pCfg
->
minRowsPerFileBlock
:
true
);
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to write block to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
tsdbError
(
"vgId:%d failed to write block to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
...
@@ -639,9 +672,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
...
@@ -639,9 +672,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
}
}
}
}
pCompCol
->
len
=
(
*
(
tDataTypeDesc
[
pDataCol
->
type
].
compFunc
))(
pCompCol
->
len
=
(
*
(
tDataTypeDesc
[
pDataCol
->
type
].
compFunc
))(
(
char
*
)
pDataCol
->
pData
,
tlen
,
rowsToWrite
,
tptr
,
(
char
*
)
pDataCol
->
pData
,
tlen
,
rowsToWrite
,
tptr
,
tsizeof
(
pHelper
->
pBuffer
)
-
lsize
,
pCfg
->
compression
,
tsizeof
(
pHelper
->
pBuffer
)
-
lsize
,
pCfg
->
compression
,
pHelper
->
compBuffer
,
tsizeof
(
pHelper
->
compBuffer
));
pHelper
->
compBuffer
,
tsizeof
(
pHelper
->
compBuffer
));
}
else
{
}
else
{
pCompCol
->
len
=
tlen
;
pCompCol
->
len
=
tlen
;
memcpy
(
tptr
,
pDataCol
->
pData
,
pCompCol
->
len
);
memcpy
(
tptr
,
pDataCol
->
pData
,
pCompCol
->
len
);
...
@@ -725,8 +758,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
...
@@ -725,8 +758,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
if
(
keyFirst
>
blockAtIdx
(
pHelper
,
blkIdx
)
->
keyLast
)
{
// Merge with the last block by append
if
(
keyFirst
>
blockAtIdx
(
pHelper
,
blkIdx
)
->
keyLast
)
{
// Merge with the last block by append
ASSERT
(
blockAtIdx
(
pHelper
,
blkIdx
)
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
&&
ASSERT
(
blockAtIdx
(
pHelper
,
blkIdx
)
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
&&
blkIdx
==
pIdx
->
numOfBlocks
-
1
);
blkIdx
==
pIdx
->
numOfBlocks
-
1
);
int
defaultRowsToWrite
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
// TODO: make a interface
int
defaultRowsToWrite
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
// TODO: make a interface
rowsWritten
=
MIN
((
defaultRowsToWrite
-
blockAtIdx
(
pHelper
,
blkIdx
)
->
numOfRows
),
pDataCols
->
numOfRows
);
rowsWritten
=
MIN
((
defaultRowsToWrite
-
blockAtIdx
(
pHelper
,
blkIdx
)
->
numOfRows
),
pDataCols
->
numOfRows
);
...
@@ -1051,7 +1083,7 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
...
@@ -1051,7 +1083,7 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
static
int
tsdbInitHelperFile
(
SRWHelper
*
pHelper
)
{
static
int
tsdbInitHelperFile
(
SRWHelper
*
pHelper
)
{
STsdbCfg
*
pCfg
=
&
pHelper
->
pRepo
->
config
;
STsdbCfg
*
pCfg
=
&
pHelper
->
pRepo
->
config
;
size_t
tsize
=
sizeof
(
SCompIdx
)
*
pCfg
->
maxTables
+
sizeof
(
TSCKSUM
);
size_t
tsize
=
sizeof
(
SCompIdx
)
*
pCfg
->
maxTables
+
sizeof
(
TSCKSUM
);
pHelper
->
pCompIdx
=
(
SCompIdx
*
)
tmalloc
(
tsize
);
pHelper
->
pCompIdx
=
(
SCompIdx
*
)
tmalloc
(
tsize
);
if
(
pHelper
->
pCompIdx
==
NULL
)
{
if
(
pHelper
->
pCompIdx
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
@@ -1099,10 +1131,8 @@ static int tsdbInitHelperBlock(SRWHelper *pHelper) {
...
@@ -1099,10 +1131,8 @@ static int tsdbInitHelperBlock(SRWHelper *pHelper) {
STsdbRepo
*
pRepo
=
helperRepo
(
pHelper
);
STsdbRepo
*
pRepo
=
helperRepo
(
pHelper
);
STsdbMeta
*
pMeta
=
pHelper
->
pRepo
->
tsdbMeta
;
STsdbMeta
*
pMeta
=
pHelper
->
pRepo
->
tsdbMeta
;
pHelper
->
pDataCols
[
0
]
=
pHelper
->
pDataCols
[
0
]
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pRepo
->
config
.
maxRowsPerFileBlock
);
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pRepo
->
config
.
maxRowsPerFileBlock
);
pHelper
->
pDataCols
[
1
]
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pRepo
->
config
.
maxRowsPerFileBlock
);
pHelper
->
pDataCols
[
1
]
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pRepo
->
config
.
maxRowsPerFileBlock
);
if
(
pHelper
->
pDataCols
[
0
]
==
NULL
||
pHelper
->
pDataCols
[
1
]
==
NULL
)
{
if
(
pHelper
->
pDataCols
[
0
]
==
NULL
||
pHelper
->
pDataCols
[
1
]
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
...
@@ -1222,12 +1252,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
...
@@ -1222,12 +1252,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
static
int
tsdbLoadBlockDataImpl
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SDataCols
*
pDataCols
)
{
static
int
tsdbLoadBlockDataImpl
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SDataCols
*
pDataCols
)
{
ASSERT
(
pCompBlock
->
numOfSubBlocks
<=
1
);
ASSERT
(
pCompBlock
->
numOfSubBlocks
<=
1
);
ASSERT
(
tsizeof
(
pHelper
->
pBuffer
)
>=
pCompBlock
->
len
);
SCompData
*
pCompData
=
(
SCompData
*
)
pHelper
->
pBuffer
;
SCompData
*
pCompData
=
(
SCompData
*
)
pHelper
->
pBuffer
;
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
&
(
pHelper
->
files
.
lastF
)
:
&
(
pHelper
->
files
.
dataF
);
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
&
(
pHelper
->
files
.
lastF
)
:
&
(
pHelper
->
files
.
dataF
);
pHelper
->
pBuffer
=
trealloc
(
pHelper
->
pBuffer
,
pCompBlock
->
len
);
if
(
pHelper
->
pBuffer
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
int
fd
=
pFile
->
fd
;
int
fd
=
pFile
->
fd
;
if
(
lseek
(
fd
,
pCompBlock
->
offset
,
SEEK_SET
)
<
0
)
{
if
(
lseek
(
fd
,
pCompBlock
->
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d tid:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
tableInfo
.
tid
,
tsdbError
(
"vgId:%d tid:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
tableInfo
.
tid
,
...
...
src/tsdb/tests/CMakeLists.txt
浏览文件 @
c3671a59
aux_source_directory
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
aux_source_directory
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
add_executable
(
tsdbTests
${
SOURCE_LIST
}
)
add_executable
(
tsdbTests
${
SOURCE_LIST
}
)
target_link_libraries
(
tsdbTests gtest gtest_main pthread common tsdb
)
target_link_libraries
(
tsdbTests gtest gtest_main pthread common tsdb
tutil trpc
)
add_test
(
NAME unit COMMAND
${
CMAKE_CURRENT_BINARY_DIR
}
/tsdbTests
)
add_test
(
NAME unit COMMAND
${
CMAKE_CURRENT_BINARY_DIR
}
/tsdbTests
)
\ No newline at end of file
src/tsdb/tests/tsdbTests.cpp
浏览文件 @
c3671a59
...
@@ -2,9 +2,8 @@
...
@@ -2,9 +2,8 @@
#include <stdlib.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/time.h>
#include "t
dataformat
.h"
#include "t
sdb
.h"
#include "tsdbMain.h"
#include "tsdbMain.h"
#include "tskiplist.h"
static
double
getCurTime
()
{
static
double
getCurTime
()
{
struct
timeval
tv
;
struct
timeval
tv
;
...
@@ -77,7 +76,7 @@ static int insertData(SInsertInfo *pInfo) {
...
@@ -77,7 +76,7 @@ static int insertData(SInsertInfo *pInfo) {
pMsg
->
numOfBlocks
=
htonl
(
pMsg
->
numOfBlocks
);
pMsg
->
numOfBlocks
=
htonl
(
pMsg
->
numOfBlocks
);
pMsg
->
compressed
=
htonl
(
pMsg
->
numOfBlocks
);
pMsg
->
compressed
=
htonl
(
pMsg
->
numOfBlocks
);
if
(
tsdbInsertData
(
pInfo
->
pRepo
,
pMsg
)
<
0
)
{
if
(
tsdbInsertData
(
pInfo
->
pRepo
,
pMsg
,
NULL
)
<
0
)
{
tfree
(
pMsg
);
tfree
(
pMsg
);
return
-
1
;
return
-
1
;
}
}
...
@@ -90,222 +89,72 @@ static int insertData(SInsertInfo *pInfo) {
...
@@ -90,222 +89,72 @@ static int insertData(SInsertInfo *pInfo) {
return
0
;
return
0
;
}
}
TEST
(
TsdbTest
,
DISABLED_tableEncodeDecode
)
{
static
void
tsdbSetCfg
(
STsdbCfg
*
pCfg
,
int32_t
tsdbId
,
int32_t
cacheBlockSize
,
int32_t
totalBlocks
,
int32_t
maxTables
,
// TEST(TsdbTest, tableEncodeDecode) {
int32_t
daysPerFile
,
int32_t
keep
,
int32_t
minRows
,
int32_t
maxRows
,
int8_t
precision
,
STable
*
pTable
=
(
STable
*
)
malloc
(
sizeof
(
STable
));
int8_t
compression
)
{
pCfg
->
tsdbId
=
tsdbId
;
pTable
->
type
=
TSDB_NORMAL_TABLE
;
pCfg
->
cacheBlockSize
=
cacheBlockSize
;
pTable
->
tableId
.
uid
=
987607499877672L
;
pCfg
->
totalBlocks
=
totalBlocks
;
pTable
->
tableId
.
tid
=
0
;
pCfg
->
maxTables
=
maxTables
;
pTable
->
superUid
=
-
1
;
pCfg
->
daysPerFile
=
daysPerFile
;
pTable
->
sversion
=
0
;
pCfg
->
keep
=
keep
;
pTable
->
tagSchema
=
NULL
;
pCfg
->
minRowsPerFileBlock
=
minRows
;
pTable
->
tagVal
=
NULL
;
pCfg
->
maxRowsPerFileBlock
=
maxRows
;
int
nCols
=
5
;
pCfg
->
precision
=
precision
;
STSchema
*
schema
=
tdNewSchema
(
nCols
);
pCfg
->
compression
=
compression
;
for
(
int
i
=
0
;
i
<
nCols
;
i
++
)
{
if
(
i
==
0
)
{
tdSchemaAddCol
(
schema
,
TSDB_DATA_TYPE_TIMESTAMP
,
i
,
-
1
);
}
else
{
tdSchemaAddCol
(
schema
,
TSDB_DATA_TYPE_INT
,
i
,
-
1
);
}
}
pTable
->
schema
=
schema
;
int
bufLen
=
0
;
void
*
buf
=
tsdbEncodeTable
(
pTable
,
&
bufLen
);
STable
*
tTable
=
tsdbDecodeTable
(
buf
,
bufLen
);
ASSERT_EQ
(
pTable
->
type
,
tTable
->
type
);
ASSERT_EQ
(
pTable
->
tableId
.
uid
,
tTable
->
tableId
.
uid
);
ASSERT_EQ
(
pTable
->
tableId
.
tid
,
tTable
->
tableId
.
tid
);
ASSERT_EQ
(
pTable
->
superUid
,
tTable
->
superUid
);
ASSERT_EQ
(
pTable
->
sversion
,
tTable
->
sversion
);
ASSERT_EQ
(
memcmp
(
pTable
->
schema
,
tTable
->
schema
,
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
nCols
),
0
);
}
}
// TEST(TsdbTest, DISABLED_createRepo) {
static
void
tsdbSetTableCfg
(
STableCfg
*
pCfg
)
{
TEST
(
TsdbTest
,
createRepo
)
{
STSchemaBuilder
schemaBuilder
=
{
0
};
STsdbCfg
config
;
STsdbRepo
*
repo
;
// 1. Create a tsdb repository
tsdbSetDefaultCfg
(
&
config
);
ASSERT_EQ
(
tsdbCreateRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
&
config
,
NULL
),
0
);
TSDB_REPO_T
*
pRepo
=
tsdbOpenRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
NULL
);
ASSERT_NE
(
pRepo
,
nullptr
);
// 2. Create a normal table
pCfg
->
type
=
TSDB_NORMAL_TABLE
;
STableCfg
tCfg
;
pCfg
->
superUid
=
TSDB_INVALID_SUPER_TABLE_ID
;
ASSERT_EQ
(
tsdbInitTableCfg
(
&
tCfg
,
TSDB_SUPER_TABLE
,
987607499877672L
,
0
),
-
1
)
;
pCfg
->
tableId
.
tid
=
1
;
ASSERT_EQ
(
tsdbInitTableCfg
(
&
tCfg
,
TSDB_NORMAL_TABLE
,
987607499877672L
,
0
),
0
)
;
pCfg
->
tableId
.
uid
=
5849583783847394
;
t
sdbTableSetName
(
&
tCfg
,
"test"
,
false
);
t
dInitTSchemaBuilder
(
&
schemaBuilder
,
0
);
int
nCols
=
5
;
int
colId
=
0
;
STSchema
*
schema
=
tdNewSchema
(
nCols
);
for
(
int
i
=
0
;
i
<
5
;
i
++
)
{
tdAddColToSchema
(
&
schemaBuilder
,
(
colId
==
0
)
?
TSDB_DATA_TYPE_TIMESTAMP
:
TSDB_DATA_TYPE_INT
,
colId
,
0
);
for
(
int
i
=
0
;
i
<
nCols
;
i
++
)
{
colId
++
;
if
(
i
==
0
)
{
tdSchemaAddCol
(
schema
,
TSDB_DATA_TYPE_TIMESTAMP
,
i
,
-
1
);
}
else
{
tdSchemaAddCol
(
schema
,
TSDB_DATA_TYPE_INT
,
i
,
-
1
);
}
}
}
tsdbTableSetSchema
(
&
tCfg
,
schema
,
true
);
pCfg
->
schema
=
tdGetSchemaFromBuilder
(
&
schemaBuilder
);
pCfg
->
name
=
strdup
(
"t1"
);
tsdbCreateTable
(
pRepo
,
&
tCfg
);
// Insert Some Data
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
SInsertInfo
iInfo
=
{
.
pRepo
=
pRepo
,
// .isAscend = true,
.
isAscend
=
false
,
.
tid
=
tCfg
.
tableId
.
tid
,
.
uid
=
tCfg
.
tableId
.
uid
,
.
sversion
=
tCfg
.
sversion
,
.
startTime
=
1584081000000
,
.
interval
=
1000
,
.
totalRows
=
10000000
,
.
rowsPerSubmit
=
1
,
.
pSchema
=
schema
};
ASSERT_EQ
(
insertData
(
&
iInfo
),
0
);
// Close the repository
tsdbCloseRepo
(
pRepo
);
// Open the repository again
pRepo
=
tsdbOpenRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
NULL
);
repo
=
(
STsdbRepo
*
)
pRepo
;
ASSERT_NE
(
pRepo
,
nullptr
);
// // Insert more data
// iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows;
// iInfo.totalRows = 10;
// iInfo.pRepo = pRepo;
// ASSERT_EQ(insertData(&iInfo), 0);
// // Close the repository
// tsdbCloseRepo(pRepo);
// // Open the repository again
// pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
// repo = (STsdbRepo *)pRepo;
// ASSERT_NE(pRepo, nullptr);
// // Read from file
// SRWHelper rhelper;
// tsdbInitReadHelper(&rhelper, repo);
// SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833);
// ASSERT_NE(pFGroup, nullptr);
// ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0);
// STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid);
// ASSERT_NE(pTable, nullptr);
// tsdbSetHelperTable(&rhelper, pTable, repo);
// ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0);
// ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0);
int
k
=
0
;
}
}
TEST
(
TsdbTest
,
DISABLED_openRepo
)
{
TEST
(
TsdbTest
,
testInsertSpeed
)
{
// TEST(TsdbTest, openRepo) {
int
vnode
=
1
;
// tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL);
int
ret
=
0
;
// ASSERT_NE(repo, nullptr);
STsdbCfg
tsdbCfg
;
STableCfg
tableCfg
;
// STsdbRepo *pRepo = (STsdbRepo *)repo;
std
::
string
testDir
=
"./test"
;
char
*
rootDir
=
strdup
((
testDir
+
"/vnode"
+
std
::
to_string
(
vnode
)).
c_str
());
// SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655);
// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbDebugFlag
=
131
;
//NOTE: you must set the flag
// tsdbOpenFile(&pGroup->files[type], O_RDONLY);
// }
// SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx));
taosRemoveDir
(
rootDir
);
// tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables);
// SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len);
// Create and open repository
tsdbSetCfg
(
&
tsdbCfg
,
1
,
16
,
4
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
);
tsdbCreateRepo
(
rootDir
,
&
tsdbCfg
);
TSDB_REPO_T
*
repo
=
tsdbOpenRepo
(
rootDir
,
NULL
);
ASSERT_NE
(
repo
,
nullptr
);
// tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo);
// Create table
tsdbSetTableCfg
(
&
tableCfg
);
tsdbCreateTable
(
repo
,
&
tableCfg
);
// int blockIdx = 0;
// Insert data
// SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx])
;
SInsertInfo
iInfo
=
{
repo
,
true
,
1
,
5849583783847394
,
0
,
1590000000000
,
10
,
10000000
,
100
,
tableCfg
.
schema
}
;
// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
insertData
(
&
iInfo
);
// tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData);
// STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
// SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5);
// tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable));
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData);
// tdResetDataCols(pDataCols);
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData);
// int k = 0;
}
tsdbCloseRepo
(
repo
,
1
);
TEST
(
TsdbTest
,
DISABLED_createFileGroup
)
{
SFileGroup
fGroup
;
// ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0);
int
k
=
0
;
}
}
static
char
*
getTKey
(
const
void
*
data
)
{
static
char
*
getTKey
(
const
void
*
data
)
{
return
(
char
*
)
data
;
return
(
char
*
)
data
;
}
static
void
insertSkipList
(
bool
isAscend
)
{
TSKEY
start_time
=
1587393453000
;
TSKEY
interval
=
1000
;
SSkipList
*
pList
=
tSkipListCreate
(
5
,
TSDB_DATA_TYPE_TIMESTAMP
,
sizeof
(
TSKEY
),
0
,
0
,
1
,
getTKey
);
ASSERT_NE
(
pList
,
nullptr
);
for
(
size_t
i
=
0
;
i
<
20000000
;
i
++
)
{
TSKEY
time
=
isAscend
?
(
start_time
+
i
*
interval
)
:
(
start_time
-
i
*
interval
);
int32_t
level
=
0
;
int32_t
headSize
=
0
;
tSkipListNewNodeInfo
(
pList
,
&
level
,
&
headSize
);
SSkipListNode
*
pNode
=
(
SSkipListNode
*
)
malloc
(
headSize
+
sizeof
(
TSKEY
));
ASSERT_NE
(
pNode
,
nullptr
);
pNode
->
level
=
level
;
*
(
TSKEY
*
)((
char
*
)
pNode
+
headSize
)
=
time
;
tSkipListPut
(
pList
,
pNode
);
}
tSkipListDestroy
(
pList
);
}
TEST
(
TsdbTest
,
DISABLED_testSkipList
)
{
// TEST(TsdbTest, testSkipList) {
double
stime
=
getCurTime
();
insertSkipList
(
true
);
double
etime
=
getCurTime
();
printf
(
"Time used to insert 100000000 records takes %f seconds
\n
"
,
etime
-
stime
);
stime
=
getCurTime
();
insertSkipList
(
false
);
etime
=
getCurTime
();
printf
(
"Time used to insert 100000000 records takes %f seconds
\n
"
,
etime
-
stime
);
}
}
\ No newline at end of file
src/util/src/tkvstore.c
浏览文件 @
c3671a59
...
@@ -78,8 +78,8 @@ int tdCreateKVStore(char *fname) {
...
@@ -78,8 +78,8 @@ int tdCreateKVStore(char *fname) {
return
0
;
return
0
;
_err:
_err:
if
(
fd
>
0
)
close
(
fd
);
if
(
fd
>
=
0
)
close
(
fd
);
remove
(
fname
);
(
void
)
remove
(
fname
);
return
-
1
;
return
-
1
;
}
}
...
@@ -106,15 +106,15 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
...
@@ -106,15 +106,15 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
goto
_err
;
goto
_err
;
}
}
if
(
access
(
pStore
->
fsnap
,
F_OK
)
==
0
)
{
// .snap file exists
pStore
->
sfd
=
open
(
pStore
->
fsnap
,
O_RDONLY
);
uTrace
(
"file %s exists, try to recover the KV store"
,
pStore
->
fsnap
);
if
(
pStore
->
sfd
<
0
)
{
pStore
->
sfd
=
open
(
pStore
->
fsnap
,
O_RDONLY
);
if
(
errno
!=
ENOENT
)
{
if
(
pStore
->
sfd
<
0
)
{
uError
(
"failed to open file %s since %s"
,
pStore
->
fsnap
,
strerror
(
errno
));
uError
(
"failed to open file %s since %s"
,
pStore
->
fsnap
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
}
else
{
uTrace
(
"file %s exists, try to recover the KV store"
,
pStore
->
fsnap
);
if
(
tdLoadKVStoreHeader
(
pStore
->
sfd
,
pStore
->
fsnap
,
&
info
)
<
0
)
{
if
(
tdLoadKVStoreHeader
(
pStore
->
sfd
,
pStore
->
fsnap
,
&
info
)
<
0
)
{
if
(
terrno
!=
TSDB_CODE_COM_FILE_CORRUPTED
)
goto
_err
;
if
(
terrno
!=
TSDB_CODE_COM_FILE_CORRUPTED
)
goto
_err
;
}
else
{
}
else
{
...
@@ -133,7 +133,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
...
@@ -133,7 +133,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
close
(
pStore
->
sfd
);
close
(
pStore
->
sfd
);
pStore
->
sfd
=
-
1
;
pStore
->
sfd
=
-
1
;
remove
(
pStore
->
fsnap
);
(
void
)
remove
(
pStore
->
fsnap
);
}
}
if
(
tdLoadKVStoreHeader
(
pStore
->
fd
,
pStore
->
fname
,
&
info
)
<
0
)
goto
_err
;
if
(
tdLoadKVStoreHeader
(
pStore
->
fd
,
pStore
->
fname
,
&
info
)
<
0
)
goto
_err
;
...
@@ -212,7 +212,7 @@ _err:
...
@@ -212,7 +212,7 @@ _err:
if
(
pStore
->
sfd
>
0
)
{
if
(
pStore
->
sfd
>
0
)
{
close
(
pStore
->
sfd
);
close
(
pStore
->
sfd
);
pStore
->
sfd
=
-
1
;
pStore
->
sfd
=
-
1
;
remove
(
pStore
->
fsnap
);
(
void
)
remove
(
pStore
->
fsnap
);
}
}
if
(
pStore
->
fd
>
0
)
{
if
(
pStore
->
fd
>
0
)
{
close
(
pStore
->
fd
);
close
(
pStore
->
fd
);
...
@@ -314,7 +314,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
...
@@ -314,7 +314,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
}
}
pStore
->
fd
=
-
1
;
pStore
->
fd
=
-
1
;
remove
(
pStore
->
fsnap
);
(
void
)
remove
(
pStore
->
fsnap
);
return
0
;
return
0
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录