Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d5059564
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d5059564
编写于
4月 16, 2020
作者:
H
hzcheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-100
上级
53f1d0a2
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
213 addition
and
173 deletion
+213
-173
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+8
-10
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+4
-1
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+6
-1
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+105
-72
src/tsdb/tests/tsdbTests.cpp
src/tsdb/tests/tsdbTests.cpp
+90
-89
未找到文件。
src/tsdb/inc/tsdbMain.h
浏览文件 @
d5059564
...
@@ -351,6 +351,7 @@ typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
...
@@ -351,6 +351,7 @@ typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
typedef
struct
{
typedef
struct
{
tsdb_rw_helper_t
type
;
// helper type
tsdb_rw_helper_t
type
;
// helper type
int
maxTables
;
int
maxTables
;
int
maxRowSize
;
int
maxRowSize
;
int
maxRows
;
int
maxRows
;
...
@@ -388,17 +389,14 @@ typedef struct {
...
@@ -388,17 +389,14 @@ typedef struct {
// For file set usage
// For file set usage
SHelperFile
files
;
SHelperFile
files
;
SCompIdx
*
pCompIdx
;
SCompIdx
*
pCompIdx
;
// size_t compIdxSize;
// For table set usage
// For table set usage
SHelperTable
tableInfo
;
SHelperTable
tableInfo
;
SCompInfo
*
pCompInfo
;
SCompInfo
*
pCompInfo
;
// size_t compInfoSize;
bool
hasOldLastBlock
;
bool
hasOldLastBlock
;
// For block set usage
// For block set usage
SCompData
*
pCompData
;
SCompData
*
pCompData
;
// size_t compDataSize;
SDataCols
*
pDataCols
[
2
];
SDataCols
*
pDataCols
[
2
];
}
SRWHelper
;
}
SRWHelper
;
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
d5059564
...
@@ -25,6 +25,7 @@
...
@@ -25,6 +25,7 @@
#include "tutil.h"
#include "tutil.h"
#include "tsdbMain.h"
#include "tsdbMain.h"
#include "tchecksum.h"
const
char
*
tsdbFileSuffix
[]
=
{
const
char
*
tsdbFileSuffix
[]
=
{
".head"
,
// TSDB_FILE_TYPE_HEAD
".head"
,
// TSDB_FILE_TYPE_HEAD
...
@@ -310,7 +311,7 @@ static int tsdbWriteFileHead(SFile *pFile) {
...
@@ -310,7 +311,7 @@ static int tsdbWriteFileHead(SFile *pFile) {
}
}
static
int
tsdbWriteHeadFileIdx
(
SFile
*
pFile
,
int
maxTables
)
{
static
int
tsdbWriteHeadFileIdx
(
SFile
*
pFile
,
int
maxTables
)
{
int
size
=
sizeof
(
SCompIdx
)
*
maxTables
;
int
size
=
sizeof
(
SCompIdx
)
*
maxTables
+
sizeof
(
TSCKSUM
)
;
void
*
buf
=
calloc
(
1
,
size
);
void
*
buf
=
calloc
(
1
,
size
);
if
(
buf
==
NULL
)
return
-
1
;
if
(
buf
==
NULL
)
return
-
1
;
...
@@ -319,6 +320,8 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
...
@@ -319,6 +320,8 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
return
-
1
;
return
-
1
;
}
}
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
buf
,
size
);
if
(
write
(
pFile
->
fd
,
buf
,
size
)
<
0
)
{
if
(
write
(
pFile
->
fd
,
buf
,
size
)
<
0
)
{
free
(
buf
);
free
(
buf
);
return
-
1
;
return
-
1
;
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
d5059564
...
@@ -56,7 +56,8 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
...
@@ -56,7 +56,8 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
);
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
);
static
void
*
tsdbCommitData
(
void
*
arg
);
static
void
*
tsdbCommitData
(
void
*
arg
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SSkipListIterator
**
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SSkipListIterator
**
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
);
static
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
);
static
int
tsdbHasDataToCommit
(
SSkipListIterator
**
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbHasDataToCommit
(
SSkipListIterator
**
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len,
// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len,
...
@@ -847,6 +848,7 @@ static void *tsdbCommitData(void *arg) {
...
@@ -847,6 +848,7 @@ static void *tsdbCommitData(void *arg) {
.
maxRows
=
pCfg
->
maxRowsPerFileBlock
,
.
maxRows
=
pCfg
->
maxRowsPerFileBlock
,
.
maxCols
=
pMeta
->
maxCols
,
.
maxCols
=
pMeta
->
maxCols
,
.
minRowsPerFileBlock
=
pCfg
->
minRowsPerFileBlock
,
.
minRowsPerFileBlock
=
pCfg
->
minRowsPerFileBlock
,
.
maxRowsPerFileBlock
=
pCfg
->
maxRowsPerFileBlock
,
.
compress
=
2
// TODO make it a configuration
.
compress
=
2
// TODO make it a configuration
};
};
if
(
tsdbInitHelper
(
&
whelper
,
&
hcfg
)
<
0
)
goto
_exit
;
if
(
tsdbInitHelper
(
&
whelper
,
&
hcfg
)
<
0
)
goto
_exit
;
...
@@ -911,6 +913,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
...
@@ -911,6 +913,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
// Loop to commit data in each table
// Loop to commit data in each table
for
(
int
tid
=
0
;
tid
<
pCfg
->
maxTables
;
tid
++
)
{
for
(
int
tid
=
0
;
tid
<
pCfg
->
maxTables
;
tid
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
tid
];
STable
*
pTable
=
pMeta
->
tables
[
tid
];
if
(
pTable
==
NULL
)
continue
;
SSkipListIterator
*
pIter
=
iters
[
tid
];
SSkipListIterator
*
pIter
=
iters
[
tid
];
// Set the helper and the buffer dataCols object to help to write this table
// Set the helper and the buffer dataCols object to help to write this table
...
@@ -929,6 +933,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
...
@@ -929,6 +933,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
ASSERT
(
dataColsKeyLast
(
pDataCols
)
>=
minKey
&&
dataColsKeyLast
(
pDataCols
)
<=
maxKey
);
ASSERT
(
dataColsKeyLast
(
pDataCols
)
>=
minKey
&&
dataColsKeyLast
(
pDataCols
)
<=
maxKey
);
int
rowsWritten
=
tsdbWriteDataBlock
(
pHelper
,
pDataCols
);
int
rowsWritten
=
tsdbWriteDataBlock
(
pHelper
,
pDataCols
);
ASSERT
(
rowsWritten
!=
0
);
if
(
rowsWritten
<
0
)
goto
_err
;
if
(
rowsWritten
<
0
)
goto
_err
;
ASSERT
(
rowsWritten
<=
pDataCols
->
numOfPoints
);
ASSERT
(
rowsWritten
<=
pDataCols
->
numOfPoints
);
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
d5059564
...
@@ -14,15 +14,7 @@
...
@@ -14,15 +14,7 @@
*/
*/
#include "tsdbMain.h"
#include "tsdbMain.h"
#include "tchecksum.h"
#include "tchecksum.h"
#include "tscompression.h"
#define adjustMem(ptr, size, expectedSize) \
do { \
if ((size) < (expectedSize)) { \
(ptr) = realloc((void *)(ptr), (expectedSize)); \
if ((ptr) == NULL) return -1; \
(size) = (expectedSize); \
} \
} while (0)
// Local function definitions
// Local function definitions
static
int
tsdbCheckHelperCfg
(
SHelperCfg
*
pCfg
);
static
int
tsdbCheckHelperCfg
(
SHelperCfg
*
pCfg
);
...
@@ -33,12 +25,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pD
...
@@ -33,12 +25,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pD
SCompBlock
*
pCompBlock
,
bool
isLast
,
bool
isSuperBlock
);
SCompBlock
*
pCompBlock
,
bool
isLast
,
bool
isSuperBlock
);
static
int
compareKeyBlock
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
compareKeyBlock
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbMergeDataWithBlock
(
SRWHelper
*
pHelper
,
int
blkIdx
,
SDataCols
*
pDataCols
);
static
int
tsdbMergeDataWithBlock
(
SRWHelper
*
pHelper
,
int
blkIdx
,
SDataCols
*
pDataCols
);
// static int nRowsLEThan(SDataCols *pDataCols, int maxKey);
// static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
static
int
tsdbInsertSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
);
static
int
tsdbInsertSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
);
static
int
tsdbAddSubBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
,
int
rowsAdded
);
static
int
tsdbAddSubBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
,
int
rowsAdded
);
static
int
tsdbUpdateSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
);
static
int
tsdbUpdateSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
);
static
int
tsdbGetRowsInRange
(
SDataCols
*
pDataCols
,
int
minKey
,
int
maxKey
);
static
int
tsdbGetRowsInRange
(
SDataCols
*
pDataCols
,
int
minKey
,
int
maxKey
);
static
void
tsdbResetHelperBlock
(
SRWHelper
*
pHelper
);
// ---------- Operations on Helper File part
// ---------- Operations on Helper File part
static
void
tsdbResetHelperFileImpl
(
SRWHelper
*
pHelper
)
{
static
void
tsdbResetHelperFileImpl
(
SRWHelper
*
pHelper
)
{
...
@@ -72,6 +63,12 @@ static void tsdbResetHelperTableImpl(SRWHelper *pHelper) {
...
@@ -72,6 +63,12 @@ static void tsdbResetHelperTableImpl(SRWHelper *pHelper) {
pHelper
->
hasOldLastBlock
=
false
;
pHelper
->
hasOldLastBlock
=
false
;
}
}
static
void
tsdbResetHelperTable
(
SRWHelper
*
pHelper
)
{
tsdbResetHelperBlock
(
pHelper
);
tsdbResetHelperTableImpl
(
pHelper
);
helperClearState
(
pHelper
,
TSDB_HELPER_TABLE_SET
);
}
static
void
tsdbInitHelperTable
(
SRWHelper
*
pHelper
)
{
static
void
tsdbInitHelperTable
(
SRWHelper
*
pHelper
)
{
tsdbResetHelperTableImpl
(
pHelper
);
tsdbResetHelperTableImpl
(
pHelper
);
}
}
...
@@ -84,6 +81,10 @@ static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) {
...
@@ -84,6 +81,10 @@ static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) {
tdResetDataCols
(
pHelper
->
pDataCols
[
1
]);
tdResetDataCols
(
pHelper
->
pDataCols
[
1
]);
}
}
static
void
tsdbResetHelperBlock
(
SRWHelper
*
pHelper
)
{
// TODO
}
static
int
tsdbInitHelperBlock
(
SRWHelper
*
pHelper
)
{
static
int
tsdbInitHelperBlock
(
SRWHelper
*
pHelper
)
{
pHelper
->
pDataCols
[
0
]
=
tdNewDataCols
(
pHelper
->
config
.
maxRowSize
,
pHelper
->
config
.
maxCols
,
pHelper
->
config
.
maxRows
);
pHelper
->
pDataCols
[
0
]
=
tdNewDataCols
(
pHelper
->
config
.
maxRowSize
,
pHelper
->
config
.
maxCols
,
pHelper
->
config
.
maxRows
);
pHelper
->
pDataCols
[
1
]
=
tdNewDataCols
(
pHelper
->
config
.
maxRowSize
,
pHelper
->
config
.
maxCols
,
pHelper
->
config
.
maxRows
);
pHelper
->
pDataCols
[
1
]
=
tdNewDataCols
(
pHelper
->
config
.
maxRowSize
,
pHelper
->
config
.
maxCols
,
pHelper
->
config
.
maxRows
);
...
@@ -167,7 +168,6 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
...
@@ -167,7 +168,6 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
pHelper
->
files
.
lastF
=
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
];
pHelper
->
files
.
lastF
=
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
];
if
(
TSDB_HELPER_TYPE
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
if
(
TSDB_HELPER_TYPE
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
char
*
fnameDup
=
strdup
(
pHelper
->
files
.
headF
.
fname
);
char
*
fnameDup
=
strdup
(
pHelper
->
files
.
headF
.
fname
);
if
(
fnameDup
==
NULL
)
goto
_err
;
if
(
fnameDup
==
NULL
)
return
-
1
;
if
(
fnameDup
==
NULL
)
return
-
1
;
char
*
dataDir
=
dirname
(
fnameDup
);
char
*
dataDir
=
dirname
(
fnameDup
);
...
@@ -183,8 +183,8 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
...
@@ -183,8 +183,8 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
lastF
),
O_RDWR
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
lastF
),
O_RDWR
)
<
0
)
goto
_err
;
// Create and open .h
// Create and open .h
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
nHeadF
),
O_WRONLY
|
O_CREAT
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
nHeadF
),
O_WRONLY
|
O_CREAT
)
<
0
)
return
-
1
;
size_t
tsize
=
TSDB_FILE_HEAD_SIZE
+
sizeof
(
SCompIdx
)
*
pHelper
->
config
.
maxTables
;
size_t
tsize
=
TSDB_FILE_HEAD_SIZE
+
sizeof
(
SCompIdx
)
*
pHelper
->
config
.
maxTables
+
sizeof
(
TSCKSUM
)
;
if
(
tsendfile
(
pHelper
->
files
.
nHeadF
.
fd
,
pHelper
->
files
.
headF
.
fd
,
NULL
,
tsize
)
<
tsize
)
goto
_err
;
if
(
tsendfile
(
pHelper
->
files
.
nHeadF
.
fd
,
pHelper
->
files
.
headF
.
fd
,
NULL
,
tsize
)
<
tsize
)
goto
_err
;
// Create and open .l file if should
// Create and open .l file if should
...
@@ -221,23 +221,33 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
...
@@ -221,23 +221,33 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if
(
pHelper
->
files
.
nHeadF
.
fd
>
0
)
{
if
(
pHelper
->
files
.
nHeadF
.
fd
>
0
)
{
close
(
pHelper
->
files
.
nHeadF
.
fd
);
close
(
pHelper
->
files
.
nHeadF
.
fd
);
pHelper
->
files
.
nHeadF
.
fd
=
-
1
;
pHelper
->
files
.
nHeadF
.
fd
=
-
1
;
if
(
hasError
)
remove
(
pHelper
->
files
.
nHeadF
.
fname
);
if
(
hasError
)
{
remove
(
pHelper
->
files
.
nHeadF
.
fname
);
}
else
{
rename
(
pHelper
->
files
.
nHeadF
.
fname
,
pHelper
->
files
.
headF
.
fname
);
pHelper
->
files
.
headF
.
info
=
pHelper
->
files
.
nHeadF
.
info
;
}
}
}
if
(
pHelper
->
files
.
nLastF
.
fd
>
0
)
{
if
(
pHelper
->
files
.
nLastF
.
fd
>
0
)
{
close
(
pHelper
->
files
.
nLastF
.
fd
);
close
(
pHelper
->
files
.
nLastF
.
fd
);
pHelper
->
files
.
nLastF
.
fd
=
-
1
;
pHelper
->
files
.
nLastF
.
fd
=
-
1
;
if
(
hasError
)
remove
(
pHelper
->
files
.
nLastF
.
fname
);
if
(
hasError
)
{
remove
(
pHelper
->
files
.
nLastF
.
fname
);
}
else
{
rename
(
pHelper
->
files
.
nLastF
.
fname
,
pHelper
->
files
.
lastF
.
fname
);
pHelper
->
files
.
lastF
.
info
=
pHelper
->
files
.
nLastF
.
info
;
}
}
}
return
0
;
return
0
;
}
}
void
tsdbSetHelperTable
(
SRWHelper
*
pHelper
,
SHelperTable
*
pHelperTable
,
STSchema
*
pSchema
)
{
void
tsdbSetHelperTable
(
SRWHelper
*
pHelper
,
SHelperTable
*
pHelperTable
,
STSchema
*
pSchema
)
{
ASSERT
(
helperHasState
(
pHelper
,
TSDB_HELPER_FILE_SET_AND_OPEN
));
ASSERT
(
helperHasState
(
pHelper
,
TSDB_HELPER_FILE_SET_AND_OPEN
|
TSDB_HELPER_IDX_LOAD
));
// Clear members and state used by previous table
// Clear members and state used by previous table
// pHelper->blockIter = 0
;
tsdbResetHelperTable
(
pHelper
)
;
pHelper
->
state
&=
(
TSDB_HELPER_TABLE_SET
-
1
);
ASSERT
(
pHelper
->
state
==
(
TSDB_HELPER_FILE_SET_AND_OPEN
|
TSDB_HELPER_IDX_LOAD
)
);
pHelper
->
tableInfo
=
*
pHelperTable
;
pHelper
->
tableInfo
=
*
pHelperTable
;
tdInitDataCols
(
pHelper
->
pDataCols
[
0
],
pSchema
);
tdInitDataCols
(
pHelper
->
pDataCols
[
0
],
pSchema
);
...
@@ -248,9 +258,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema
...
@@ -248,9 +258,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema
pHelper
->
hasOldLastBlock
=
true
;
pHelper
->
hasOldLastBlock
=
true
;
}
}
// pHelper->compIdx = pHelper->pCompIdx[pHelper->tableInfo.tid];
helperSetState
(
pHelper
,
TSDB_HELPER_TABLE_SET
);
helperSetState
(
pHelper
,
TSDB_HELPER_TABLE_SET
);
ASSERT
(
pHelper
->
state
==
((
TSDB_HELPER_TABLE_SET
<<
1
)
-
1
));
}
}
/**
/**
...
@@ -268,7 +277,6 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
...
@@ -268,7 +277,6 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
TSKEY
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
TSKEY
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
ASSERT
(
helperHasState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
));
ASSERT
(
helperHasState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
));
// SCompIdx curIdx = pHelper->compIdx; // old table SCompIdx for sendfile usage
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
// for change purpose
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
// for change purpose
// Load the SCompInfo part if neccessary
// Load the SCompInfo part if neccessary
...
@@ -283,7 +291,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
...
@@ -283,7 +291,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
SFile
*
pWFile
=
NULL
;
SFile
*
pWFile
=
NULL
;
bool
isLast
=
false
;
bool
isLast
=
false
;
if
(
rowsToWrite
>
pHelper
->
config
.
minRowsPerFileBlock
)
{
if
(
rowsToWrite
>
=
pHelper
->
config
.
minRowsPerFileBlock
)
{
pWFile
=
&
(
pHelper
->
files
.
dataF
);
pWFile
=
&
(
pHelper
->
files
.
dataF
);
}
else
{
}
else
{
isLast
=
true
;
isLast
=
true
;
...
@@ -314,7 +322,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
...
@@ -314,7 +322,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
rowsToWrite
=
tsdbGetRowsInRange
(
pDataCols
,
0
,
pCompBlock
->
keyFirst
-
1
);
rowsToWrite
=
tsdbGetRowsInRange
(
pDataCols
,
0
,
pCompBlock
->
keyFirst
-
1
);
ASSERT
(
rowsToWrite
>
0
);
ASSERT
(
rowsToWrite
>
0
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
dataF
),
pDataCols
,
rowsToWrite
,
&
compBlock
,
false
,
true
)
<
0
)
goto
_err
;
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
dataF
),
pDataCols
,
rowsToWrite
,
&
compBlock
,
false
,
true
)
<
0
)
goto
_err
;
if
(
tsdbInsertSuperBlock
(
pHelper
,
pCompBlock
,
pCompBlock
-
pHelper
->
pCompInfo
->
blocks
)
<
0
)
goto
_err
;
if
(
tsdbInsertSuperBlock
(
pHelper
,
pCompBlock
,
blkIdx
)
<
0
)
goto
_err
;
}
}
}
}
}
}
...
@@ -367,6 +375,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
...
@@ -367,6 +375,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
if
(
tsendfile
(
pHelper
->
files
.
nHeadF
.
fd
,
pHelper
->
files
.
headF
.
fd
,
NULL
,
pIdx
->
len
)
<
pIdx
->
len
)
return
-
1
;
if
(
tsendfile
(
pHelper
->
files
.
nHeadF
.
fd
,
pHelper
->
files
.
headF
.
fd
,
NULL
,
pIdx
->
len
)
<
pIdx
->
len
)
return
-
1
;
}
}
}
else
{
}
else
{
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
pIdx
->
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
pIdx
->
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
if
(
pIdx
->
offset
<
0
)
return
-
1
;
if
(
pIdx
->
offset
<
0
)
return
-
1
;
...
@@ -379,6 +388,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
...
@@ -379,6 +388,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
)
{
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
)
{
if
(
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
return
-
1
;
if
(
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
return
-
1
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pCompIdx
,
tsizeof
(
pHelper
->
pCompIdx
));
if
(
twrite
(
pHelper
->
files
.
nHeadF
.
fd
,
(
void
*
)
pHelper
->
pCompIdx
,
tsizeof
(
pHelper
->
pCompIdx
))
<
tsizeof
(
pHelper
->
pCompIdx
))
if
(
twrite
(
pHelper
->
files
.
nHeadF
.
fd
,
(
void
*
)
pHelper
->
pCompIdx
,
tsizeof
(
pHelper
->
pCompIdx
))
<
tsizeof
(
pHelper
->
pCompIdx
))
return
-
1
;
return
-
1
;
return
0
;
return
0
;
...
@@ -392,8 +403,11 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
...
@@ -392,8 +403,11 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
int
fd
=
pHelper
->
files
.
headF
.
fd
;
int
fd
=
pHelper
->
files
.
headF
.
fd
;
if
(
lseek
(
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
return
-
1
;
if
(
lseek
(
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
return
-
1
;
if
(
tread
(
fd
,
(
void
*
)(
pHelper
->
pCompIdx
),
tsizeof
(
pHelper
->
pCompIdx
))
<
tsizeof
(
pHelper
->
pCompIdx
))
return
-
1
;
if
(
tread
(
fd
,
(
void
*
)(
pHelper
->
pCompIdx
),
tsizeof
((
void
*
)
pHelper
->
pCompIdx
))
<
tsizeof
(
pHelper
->
pCompIdx
))
return
-
1
;
// TODO: check the correctness of the part
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
pHelper
->
pCompIdx
),
tsizeof
((
void
*
)
pHelper
->
pCompIdx
)))
{
// TODO: File is broken, try to deal with it
return
-
1
;
}
}
}
helperSetState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
);
helperSetState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
);
...
@@ -408,19 +422,16 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
...
@@ -408,19 +422,16 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
// SCompIdx curCompIdx = pHelper->compIdx;
ASSERT
(
pIdx
->
offset
>
0
&&
pIdx
->
len
>
0
);
int
fd
=
pHelper
->
files
.
headF
.
fd
;
int
fd
=
pHelper
->
files
.
headF
.
fd
;
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
if
(
pIdx
->
offset
>
0
)
{
if
(
lseek
(
fd
,
pIdx
->
offset
,
SEEK_SET
)
<
0
)
return
-
1
;
if
(
lseek
(
fd
,
pIdx
->
offset
,
SEEK_SET
)
<
0
)
return
-
1
;
// adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len);
pHelper
->
pCompInfo
=
trealloc
((
void
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
pHelper
->
pCompInfo
=
trealloc
((
void
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
if
(
tread
(
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
return
-
1
;
if
(
tread
(
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
return
-
1
;
// TODO: check the checksum
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
))
return
-
1
;
}
helperSetState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
);
helperSetState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
);
}
}
...
@@ -519,15 +530,20 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
...
@@ -519,15 +530,20 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
int
fd
=
(
pCompBlock
->
last
)
?
pHelper
->
files
.
lastF
.
fd
:
pHelper
->
files
.
dataF
.
fd
;
int
fd
=
(
pCompBlock
->
last
)
?
pHelper
->
files
.
lastF
.
fd
:
pHelper
->
files
.
dataF
.
fd
;
if
(
tread
(
fd
,
(
void
*
)
pCompData
,
pCompBlock
->
len
)
<
pCompBlock
->
len
)
goto
_err
;
if
(
tread
(
fd
,
(
void
*
)
pCompData
,
pCompBlock
->
len
)
<
pCompBlock
->
len
)
goto
_err
;
ASSERT
(
pCompData
->
numOfCols
==
pCompBlock
->
numOfCols
);
{
// TODO : check the correctness of the part
// TODO : check the checksum
size_t
tsize
=
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pCompBlock
->
numOfCols
+
sizeof
(
TSCKSUM
);
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
pCompData
,
tsize
))
goto
_err
;
for
(
int
i
=
0
;
i
<
pCompData
->
numOfCols
;
i
++
)
{
// TODO: check the data checksum
// if (!taosCheckChecksumWhole())
}
}
ASSERT
(
pCompBlock
->
numOfCols
==
pCompData
->
numOfCols
);
ASSERT
(
pCompBlock
->
numOfCols
==
pCompData
->
numOfCols
);
pDataCols
->
numOfPoints
=
pCompBlock
->
numOfPoints
;
pDataCols
->
numOfPoints
=
pCompBlock
->
numOfPoints
;
size_t
tlen
=
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pCompBlock
->
numOfCols
;
int
ccol
=
0
,
dcol
=
0
;
int
ccol
=
0
,
dcol
=
0
;
while
(
true
)
{
while
(
true
)
{
if
(
ccol
>=
pDataCols
->
numOfCols
)
{
if
(
ccol
>=
pDataCols
->
numOfCols
)
{
...
@@ -541,7 +557,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
...
@@ -541,7 +557,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
if
(
pCompCol
->
colId
==
pDataCol
->
colId
)
{
if
(
pCompCol
->
colId
==
pDataCol
->
colId
)
{
// TODO: uncompress
// TODO: uncompress
memcpy
(
pDataCol
->
pData
,
(
void
*
)(((
char
*
)
pCompData
)
+
t
len
+
pCompCol
->
offset
),
pCompCol
->
len
);
memcpy
(
pDataCol
->
pData
,
(
void
*
)(((
char
*
)
pCompData
)
+
t
size
+
pCompCol
->
offset
),
pCompCol
->
len
);
ccol
++
;
ccol
++
;
dcol
++
;
dcol
++
;
}
else
if
(
pCompCol
->
colId
>
pDataCol
->
colId
)
{
}
else
if
(
pCompCol
->
colId
>
pDataCol
->
colId
)
{
...
@@ -567,8 +583,10 @@ int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target) {
...
@@ -567,8 +583,10 @@ int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target) {
int
numOfSubBlock
=
pCompBlock
->
numOfSubBlocks
;
int
numOfSubBlock
=
pCompBlock
->
numOfSubBlocks
;
if
(
numOfSubBlock
>
1
)
pCompBlock
=
(
SCompBlock
*
)((
char
*
)
pHelper
->
pCompInfo
+
pCompBlock
->
offset
);
if
(
numOfSubBlock
>
1
)
pCompBlock
=
(
SCompBlock
*
)((
char
*
)
pHelper
->
pCompInfo
+
pCompBlock
->
offset
);
tdResetDataCols
(
pHelper
->
pDataCols
[
0
]);
if
(
tsdbLoadBlockDataImpl
(
pHelper
,
pCompBlock
,
pHelper
->
pDataCols
[
0
])
<
0
)
goto
_err
;
if
(
tsdbLoadBlockDataImpl
(
pHelper
,
pCompBlock
,
pHelper
->
pDataCols
[
0
])
<
0
)
goto
_err
;
for
(
int
i
=
1
;
i
<
numOfSubBlock
;
i
++
)
{
for
(
int
i
=
1
;
i
<
numOfSubBlock
;
i
++
)
{
tdResetDataCols
(
pHelper
->
pDataCols
[
1
]);
pCompBlock
++
;
pCompBlock
++
;
if
(
tsdbLoadBlockDataImpl
(
pHelper
,
pCompBlock
,
pHelper
->
pDataCols
[
1
])
<
0
)
goto
_err
;
if
(
tsdbLoadBlockDataImpl
(
pHelper
,
pCompBlock
,
pHelper
->
pDataCols
[
1
])
<
0
)
goto
_err
;
if
(
tdMergeDataCols
(
pHelper
->
pDataCols
[
0
],
pHelper
->
pDataCols
[
1
],
pHelper
->
pDataCols
[
1
]
->
numOfPoints
)
<
0
)
goto
_err
;
if
(
tdMergeDataCols
(
pHelper
->
pDataCols
[
0
],
pHelper
->
pDataCols
[
1
],
pHelper
->
pDataCols
[
1
]
->
numOfPoints
)
<
0
)
goto
_err
;
...
@@ -614,8 +632,11 @@ static void tsdbClearHelperFile(SHelperFile *pHFile) {
...
@@ -614,8 +632,11 @@ static void tsdbClearHelperFile(SHelperFile *pHFile) {
}
}
static
bool
tsdbShouldCreateNewLast
(
SRWHelper
*
pHelper
)
{
static
bool
tsdbShouldCreateNewLast
(
SRWHelper
*
pHelper
)
{
// TODO
ASSERT
(
pHelper
->
files
.
lastF
.
fd
>
0
);
return
0
;
struct
stat
st
;
fstat
(
pHelper
->
files
.
lastF
.
fd
,
&
st
);
if
(
st
.
st_size
>
32
*
1024
+
TSDB_FILE_HEAD_SIZE
)
return
true
;
return
false
;
}
}
static
int
tsdbWriteBlockToFile
(
SRWHelper
*
pHelper
,
SFile
*
pFile
,
SDataCols
*
pDataCols
,
int
rowsToWrite
,
SCompBlock
*
pCompBlock
,
static
int
tsdbWriteBlockToFile
(
SRWHelper
*
pHelper
,
SFile
*
pFile
,
SDataCols
*
pDataCols
,
int
rowsToWrite
,
SCompBlock
*
pCompBlock
,
...
@@ -629,7 +650,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
...
@@ -629,7 +650,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
goto
_err
;
if
(
offset
<
0
)
goto
_err
;
pCompData
=
(
SCompData
*
)
malloc
(
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pDataCols
->
numOfCols
);
pCompData
=
(
SCompData
*
)
malloc
(
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pDataCols
->
numOfCols
+
sizeof
(
TSCKSUM
)
);
if
(
pCompData
==
NULL
)
goto
_err
;
if
(
pCompData
==
NULL
)
goto
_err
;
int
nColsNotAllNull
=
0
;
int
nColsNotAllNull
=
0
;
...
@@ -639,12 +660,14 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
...
@@ -639,12 +660,14 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
SCompCol
*
pCompCol
=
pCompData
->
cols
+
nColsNotAllNull
;
SCompCol
*
pCompCol
=
pCompData
->
cols
+
nColsNotAllNull
;
if
(
0
)
{
if
(
0
)
{
// TODO: all data are NULL
// TODO: all data
to commit
are NULL
continue
;
continue
;
}
}
// Compress the data here
// Compress the data here
{}
{
// TODO
}
pCompCol
->
colId
=
pDataCol
->
colId
;
pCompCol
->
colId
=
pDataCol
->
colId
;
pCompCol
->
type
=
pDataCol
->
type
;
pCompCol
->
type
=
pDataCol
->
type
;
...
@@ -655,14 +678,17 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
...
@@ -655,14 +678,17 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
toffset
+=
pCompCol
->
len
;
toffset
+=
pCompCol
->
len
;
}
}
ASSERT
(
nColsNotAllNull
>
0
);
ASSERT
(
nColsNotAllNull
>
0
&&
nColsNotAllNull
<=
pDataCols
->
numOfCols
);
pCompData
->
delimiter
=
TSDB_FILE_DELIMITER
;
pCompData
->
delimiter
=
TSDB_FILE_DELIMITER
;
pCompData
->
uid
=
pHelper
->
tableInfo
.
uid
;
pCompData
->
uid
=
pHelper
->
tableInfo
.
uid
;
pCompData
->
numOfCols
=
nColsNotAllNull
;
pCompData
->
numOfCols
=
nColsNotAllNull
;
size_t
tsize
=
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
nColsNotAllNull
;
// Write SCompData + SCompCol part
size_t
tsize
=
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
nColsNotAllNull
+
sizeof
(
TSCKSUM
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pCompData
,
tsize
);
if
(
twrite
(
pFile
->
fd
,
(
void
*
)
pCompData
,
tsize
)
<
tsize
)
goto
_err
;
if
(
twrite
(
pFile
->
fd
,
(
void
*
)
pCompData
,
tsize
)
<
tsize
)
goto
_err
;
// Write true data part
int
nCompCol
=
0
;
int
nCompCol
=
0
;
for
(
int
ncol
=
0
;
ncol
<
pDataCols
->
numOfCols
;
ncol
++
)
{
for
(
int
ncol
=
0
;
ncol
<
pDataCols
->
numOfCols
;
ncol
++
)
{
ASSERT
(
nCompCol
<
nColsNotAllNull
);
ASSERT
(
nCompCol
<
nColsNotAllNull
);
...
@@ -679,7 +705,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
...
@@ -679,7 +705,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
pCompBlock
->
last
=
isLast
;
pCompBlock
->
last
=
isLast
;
pCompBlock
->
offset
=
offset
;
pCompBlock
->
offset
=
offset
;
pCompBlock
->
algorithm
=
2
;
// TODO
pCompBlock
->
algorithm
=
0
;
// TODO
pCompBlock
->
numOfPoints
=
rowsToWrite
;
pCompBlock
->
numOfPoints
=
rowsToWrite
;
pCompBlock
->
sversion
=
pHelper
->
tableInfo
.
sversion
;
pCompBlock
->
sversion
=
pHelper
->
tableInfo
.
sversion
;
pCompBlock
->
len
=
(
int32_t
)
tsize
;
pCompBlock
->
len
=
(
int32_t
)
tsize
;
...
@@ -713,18 +739,13 @@ static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) {
...
@@ -713,18 +739,13 @@ static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) {
return
((
*
(
TSKEY
*
)
arg1
)
-
(
*
(
TSKEY
*
)
arg2
));
return
((
*
(
TSKEY
*
)
arg1
)
-
(
*
(
TSKEY
*
)
arg2
));
}
}
// static int nRowsLEThan(SDataCols *pDataCols, int maxKey) {
// void *ptr = taosbsearch((void *)&maxKey, pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), compKeyFunc, TD_LE);
// if (ptr == NULL) return 0;
// return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1;
// }
// Merge the data with a block in file
// Merge the data with a block in file
static
int
tsdbMergeDataWithBlock
(
SRWHelper
*
pHelper
,
int
blkIdx
,
SDataCols
*
pDataCols
)
{
static
int
tsdbMergeDataWithBlock
(
SRWHelper
*
pHelper
,
int
blkIdx
,
SDataCols
*
pDataCols
)
{
// TODO: set pHelper->hasOldBlock
// TODO: set pHelper->hasOldBlock
int
rowsWritten
=
0
;
int
rowsWritten
=
0
;
SCompBlock
compBlock
=
{
0
};
SCompBlock
compBlock
=
{
0
};
ASSERT
(
pDataCols
->
numOfPoints
>
0
);
TSKEY
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
TSKEY
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
...
@@ -733,7 +754,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
...
@@ -733,7 +754,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
SCompBlock
*
pCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
blkIdx
;
SCompBlock
*
pCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
blkIdx
;
ASSERT
(
pCompBlock
->
numOfSubBlocks
>=
1
);
ASSERT
(
pCompBlock
->
numOfSubBlocks
>=
1
);
ASSERT
(
keyFirst
>=
pCompBlock
->
keyFirst
);
ASSERT
(
keyFirst
>=
pCompBlock
->
keyFirst
);
ASSERT
(
compareKeyBlock
((
void
*
)
&
keyFirst
,
(
void
*
)
pCompBlock
)
==
0
);
//
ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
if
(
keyFirst
>
pCompBlock
->
keyLast
)
{
// Merge the last block by append
if
(
keyFirst
>
pCompBlock
->
keyLast
)
{
// Merge the last block by append
ASSERT
(
pCompBlock
->
last
&&
pCompBlock
->
numOfPoints
<
pHelper
->
config
.
minRowsPerFileBlock
);
ASSERT
(
pCompBlock
->
last
&&
pCompBlock
->
numOfPoints
<
pHelper
->
config
.
minRowsPerFileBlock
);
...
@@ -870,9 +891,13 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) {
...
@@ -870,9 +891,13 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) {
ASSERT
(
spaceLeft
>=
0
);
ASSERT
(
spaceLeft
>=
0
);
if
(
spaceLeft
<
spaceNeeded
)
{
if
(
spaceLeft
<
spaceNeeded
)
{
size_t
tsize
=
tsizeof
(
pHelper
->
pCompInfo
)
+
sizeof
(
SCompBlock
)
*
16
;
size_t
tsize
=
tsizeof
(
pHelper
->
pCompInfo
)
+
sizeof
(
SCompBlock
)
*
16
;
if
(
tsizeof
(
pHelper
->
pCompInfo
)
==
0
)
tsize
+=
sizeof
(
SCompInfo
);
if
(
tsizeof
(
pHelper
->
pCompInfo
)
==
0
)
{
pIdx
->
len
=
sizeof
(
SCompData
)
+
sizeof
(
TSCKSUM
);
tsize
=
tsize
+
sizeof
(
SCompInfo
)
+
sizeof
(
TSCKSUM
);
}
pHelper
->
pCompInfo
=
(
SCompInfo
*
)
trealloc
(
pHelper
->
pCompInfo
,
tsize
);
pHelper
->
pCompInfo
=
(
SCompInfo
*
)
trealloc
(
pHelper
->
pCompInfo
,
tsize
);
if
(
pHelper
->
pCompInfo
==
NULL
)
return
-
1
;
}
}
return
0
;
return
0
;
...
@@ -881,20 +906,23 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) {
...
@@ -881,20 +906,23 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) {
static
int
tsdbInsertSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
)
{
static
int
tsdbInsertSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
)
{
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<=
pIdx
->
numOfSuperBlocks
);
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<=
pIdx
->
numOfSuperBlocks
);
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
1
);
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
1
);
// Adjust memory if no more room
// Adjust memory if no more room
if
(
tsdbAdjustInfoSizeIfNeeded
(
pHelper
,
sizeof
(
SCompBlock
))
<
0
)
goto
_err
;
if
(
tsdbAdjustInfoSizeIfNeeded
(
pHelper
,
sizeof
(
SCompBlock
))
<
0
)
goto
_err
;
// Insert the block
// Change the offset
if
(
blkIdx
<
pIdx
->
numOfSuperBlocks
)
{
for
(
int
i
=
0
;
i
<
pIdx
->
numOfSuperBlocks
;
i
++
)
{
SCompBlock
*
pTCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
blkIdx
;
SCompBlock
*
pTCompBlock
=
&
pHelper
->
pCompInfo
->
blocks
[
i
];
memmove
((
void
*
)(
pTCompBlock
+
1
),
(
void
*
)
pTCompBlock
,
pIdx
->
len
-
sizeof
(
SCompInfo
)
-
sizeof
(
SCompBlock
)
*
blkIdx
);
pTCompBlock
++
;
for
(
int
i
=
0
;
i
<
pIdx
->
numOfSuperBlocks
-
blkIdx
;
i
++
)
{
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
pTCompBlock
->
offset
+=
sizeof
(
SCompBlock
);
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
pTCompBlock
->
offset
+=
sizeof
(
SCompBlock
);
}
}
// Memmove if needed
int
tsize
=
pIdx
->
len
-
(
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
blkIdx
);
if
(
tsize
>
0
)
{
memmove
((
void
*
)((
char
*
)
pHelper
->
pCompInfo
+
sizeof
(
SCompData
)
+
sizeof
(
SCompBlock
)
*
(
blkIdx
+
1
)),
(
void
*
)((
char
*
)
pHelper
->
pCompInfo
+
sizeof
(
SCompData
)
+
sizeof
(
SCompBlock
)
*
blkIdx
),
tsize
);
}
}
pHelper
->
pCompInfo
->
blocks
[
blkIdx
]
=
*
pCompBlock
;
pHelper
->
pCompInfo
->
blocks
[
blkIdx
]
=
*
pCompBlock
;
...
@@ -905,7 +933,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
...
@@ -905,7 +933,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
return
0
;
return
0
;
_err:
_err:
return
-
1
;
return
-
1
;
}
}
...
@@ -913,6 +941,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
...
@@ -913,6 +941,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
0
);
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
0
);
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<
pIdx
->
numOfSuperBlocks
);
SCompBlock
*
pSCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
blkIdx
;
SCompBlock
*
pSCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
blkIdx
;
ASSERT
(
pSCompBlock
->
numOfSubBlocks
>=
1
&&
pSCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
);
ASSERT
(
pSCompBlock
->
numOfSubBlocks
>=
1
&&
pSCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
);
...
@@ -926,7 +956,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
...
@@ -926,7 +956,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
memmove
((
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pSCompBlock
->
offset
+
pSCompBlock
->
len
+
sizeof
(
SCompBlock
)),
memmove
((
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pSCompBlock
->
offset
+
pSCompBlock
->
len
+
sizeof
(
SCompBlock
)),
(
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pSCompBlock
->
offset
+
pSCompBlock
->
len
),
tsize
);
(
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pSCompBlock
->
offset
+
pSCompBlock
->
len
),
tsize
);
for
(
int
i
=
blkIdx
;
i
<
pIdx
->
numOfSuperBlocks
;
i
++
)
{
for
(
int
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfSuperBlocks
;
i
++
)
{
SCompBlock
*
pTCompBlock
=
&
pHelper
->
pCompInfo
->
blocks
[
i
];
SCompBlock
*
pTCompBlock
=
&
pHelper
->
pCompInfo
->
blocks
[
i
];
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
pTCompBlock
->
offset
+=
sizeof
(
SCompBlock
);
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
pTCompBlock
->
offset
+=
sizeof
(
SCompBlock
);
}
}
...
@@ -936,11 +966,15 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
...
@@ -936,11 +966,15 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
*
(
SCompBlock
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pSCompBlock
->
offset
+
pSCompBlock
->
len
)
=
*
pCompBlock
;
*
(
SCompBlock
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pSCompBlock
->
offset
+
pSCompBlock
->
len
)
=
*
pCompBlock
;
pSCompBlock
->
numOfSubBlocks
++
;
pSCompBlock
->
numOfSubBlocks
++
;
ASSERT
(
pSCompBlock
->
numOfSubBlocks
<=
TSDB_MAX_SUBBLOCKS
);
pSCompBlock
->
len
+=
sizeof
(
SCompBlock
);
pSCompBlock
->
len
+=
sizeof
(
SCompBlock
);
pSCompBlock
->
numOfPoints
+=
rowsAdded
;
pSCompBlock
->
keyFirst
=
MIN
(
pSCompBlock
->
keyFirst
,
pCompBlock
->
keyFirst
);
pSCompBlock
->
keyLast
=
MAX
(
pSCompBlock
->
keyLast
,
pCompBlock
->
keyLast
);
pIdx
->
len
+=
sizeof
(
SCompBlock
);
pIdx
->
len
+=
sizeof
(
SCompBlock
);
}
else
{
// Need to create two sub-blocks
}
else
{
// Need to create two sub-blocks
void
*
ptr
=
NULL
;
void
*
ptr
=
NULL
;
for
(
int
i
=
blkIdx
-
1
;
i
>=
0
;
i
--
)
{
for
(
int
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfSuperBlocks
;
i
++
)
{
SCompBlock
*
pTCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
i
;
SCompBlock
*
pTCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
i
;
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
{
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
{
ptr
=
(
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pTCompBlock
->
offset
+
pTCompBlock
->
len
);
ptr
=
(
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pTCompBlock
->
offset
+
pTCompBlock
->
len
);
...
@@ -948,8 +982,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
...
@@ -948,8 +982,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
}
}
}
}
if
(
ptr
==
NULL
)
if
(
ptr
==
NULL
)
ptr
=
(
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pIdx
->
len
-
sizeof
(
TSCKSUM
));
ptr
=
(
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
sizeof
(
SCompInfo
)
+
sizeof
(
SCompBlock
)
*
pIdx
->
numOfSuperBlocks
);
size_t
tsize
=
pIdx
->
len
-
((
char
*
)
ptr
-
(
char
*
)(
pHelper
->
pCompInfo
));
size_t
tsize
=
pIdx
->
len
-
((
char
*
)
ptr
-
(
char
*
)(
pHelper
->
pCompInfo
));
if
(
tsize
>
0
)
{
if
(
tsize
>
0
)
{
...
@@ -1040,5 +1073,5 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey) {
...
@@ -1040,5 +1073,5 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey) {
if
((
TSKEY
*
)
ptr2
-
(
TSKEY
*
)
ptr1
<
0
)
return
0
;
if
((
TSKEY
*
)
ptr2
-
(
TSKEY
*
)
ptr1
<
0
)
return
0
;
return
(
TSKEY
*
)
ptr2
-
(
TSKEY
*
)
ptr
1
;
return
(
(
TSKEY
*
)
ptr2
-
(
TSKEY
*
)
ptr1
)
+
1
;
}
}
\ No newline at end of file
src/tsdb/tests/tsdbTests.cpp
浏览文件 @
d5059564
...
@@ -48,98 +48,99 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) {
...
@@ -48,98 +48,99 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) {
ASSERT_EQ
(
memcmp
(
pTable
->
schema
,
tTable
->
schema
,
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
nCols
),
0
);
ASSERT_EQ
(
memcmp
(
pTable
->
schema
,
tTable
->
schema
,
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
nCols
),
0
);
}
}
TEST
(
TsdbTest
,
DISABLED_createRepo
)
{
// TEST(TsdbTest, DISABLED_createRepo) {
// TEST(TsdbTest, createRepo) {
TEST
(
TsdbTest
,
createRepo
)
{
// STsdbCfg config;
STsdbCfg
config
;
// // 1. Create a tsdb repository
// 1. Create a tsdb repository
// tsdbSetDefaultCfg(&config);
tsdbSetDefaultCfg
(
&
config
);
// tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL);
ASSERT_EQ
(
tsdbCreateRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
&
config
,
NULL
),
0
);
// ASSERT_NE(pRepo, nullptr);
tsdb_repo_t
*
pRepo
=
tsdbOpenRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
NULL
);
// // 2. Create a normal table
ASSERT_NE
(
pRepo
,
nullptr
);
// STableCfg tCfg;
// ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1);
// 2. Create a normal table
// ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0);
STableCfg
tCfg
;
ASSERT_EQ
(
tsdbInitTableCfg
(
&
tCfg
,
TSDB_SUPER_TABLE
,
987607499877672L
,
0
),
-
1
);
// int nCols = 5;
ASSERT_EQ
(
tsdbInitTableCfg
(
&
tCfg
,
TSDB_NORMAL_TABLE
,
987607499877672L
,
0
),
0
);
// STSchema *schema = tdNewSchema(nCols);
int
nCols
=
5
;
// for (int i = 0; i < nCols; i++) {
STSchema
*
schema
=
tdNewSchema
(
nCols
);
// if (i == 0) {
// tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
for
(
int
i
=
0
;
i
<
nCols
;
i
++
)
{
// } else {
if
(
i
==
0
)
{
// tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1);
tdSchemaAppendCol
(
schema
,
TSDB_DATA_TYPE_TIMESTAMP
,
i
,
-
1
);
// }
}
else
{
// }
tdSchemaAppendCol
(
schema
,
TSDB_DATA_TYPE_INT
,
i
,
-
1
);
}
// tsdbTableSetSchema(&tCfg, schema, true);
}
// tsdbCreateTable(pRepo, &tCfg);
tsdbTableSetSchema
(
&
tCfg
,
schema
,
true
);
// // // 3. Loop to write some simple data
tsdbCreateTable
(
pRepo
,
&
tCfg
);
// int nRows = 1;
// int rowsPerSubmit = 1;
// // 3. Loop to write some simple data
// int64_t start_time = 1584081000000;
int
nRows
=
10000000
;
int
rowsPerSubmit
=
10
;
// SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit);
int64_t
start_time
=
1584081000000
;
// double stime = getCurTime();
// for (int k = 0; k < nRows/rowsPerSubmit; k++) {
// memset((void *)pMsg, 0, sizeof(SSubmitMsg));
// SSubmitBlk *pBlock = pMsg->blocks;
// pBlock->uid = 987607499877672L;
// pBlock->tid = 0;
// pBlock->sversion = 0;
// pBlock->len = 0;
// for (int i = 0; i < rowsPerSubmit; i++) {
// // start_time += 1000;
// start_time += 1000;
// SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
// tdInitDataRow(row, schema);
// for (int j = 0; j < schemaNCols(schema); j++) {
SSubmitMsg
*
pMsg
=
(
SSubmitMsg
*
)
malloc
(
sizeof
(
SSubmitMsg
)
+
sizeof
(
SSubmitBlk
)
+
tdMaxRowBytesFromSchema
(
schema
)
*
rowsPerSubmit
);
// if (j == 0) { // Just for timestamp
// tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j));
// } else { // For int
// int val = 10;
// tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
// }
// }
// pBlock->len += dataRowLen(row);
// }
// pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
// pMsg->numOfBlocks = 1;
// pBlock->len = htonl(pBlock->len);
double
stime
=
getCurTime
();
// pBlock->numOfRows = htonl(pBlock->numOfRows);
// pBlock->uid = htobe64(pBlock->uid);
// pBlock->tid = htonl(pBlock->tid);
// pBlock->sversion = htonl(pBlock->sversion);
for
(
int
k
=
0
;
k
<
nRows
/
rowsPerSubmit
;
k
++
)
{
// pBlock->padding = htonl(pBlock->padding);
memset
((
void
*
)
pMsg
,
0
,
sizeof
(
SSubmitMsg
));
SSubmitBlk
*
pBlock
=
pMsg
->
blocks
;
pBlock
->
uid
=
987607499877672L
;
pBlock
->
tid
=
0
;
pBlock
->
sversion
=
0
;
pBlock
->
len
=
0
;
for
(
int
i
=
0
;
i
<
rowsPerSubmit
;
i
++
)
{
// start_time += 1000;
start_time
+=
1000
;
SDataRow
row
=
(
SDataRow
)(
pBlock
->
data
+
pBlock
->
len
);
tdInitDataRow
(
row
,
schema
);
for
(
int
j
=
0
;
j
<
schemaNCols
(
schema
);
j
++
)
{
if
(
j
==
0
)
{
// Just for timestamp
tdAppendColVal
(
row
,
(
void
*
)(
&
start_time
),
schemaColAt
(
schema
,
j
));
}
else
{
// For int
int
val
=
10
;
tdAppendColVal
(
row
,
(
void
*
)(
&
val
),
schemaColAt
(
schema
,
j
));
}
}
pBlock
->
len
+=
dataRowLen
(
row
);
}
pMsg
->
length
=
pMsg
->
length
+
sizeof
(
SSubmitBlk
)
+
pBlock
->
len
;
pMsg
->
numOfBlocks
=
1
;
pBlock
->
len
=
htonl
(
pBlock
->
len
);
pBlock
->
numOfRows
=
htonl
(
pBlock
->
numOfRows
);
pBlock
->
uid
=
htobe64
(
pBlock
->
uid
);
pBlock
->
tid
=
htonl
(
pBlock
->
tid
);
// pMsg->length = htonl(pMsg->length);
pBlock
->
sversion
=
htonl
(
pBlock
->
sversion
);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
pBlock
->
padding
=
htonl
(
pBlock
->
padding
);
// pMsg->compressed = htonl(pMsg->numOfBlocks);
// tsdbInsertData(pRepo, pMsg);
pMsg
->
length
=
htonl
(
pMsg
->
length
);
// }
pMsg
->
numOfBlocks
=
htonl
(
pMsg
->
numOfBlocks
);
pMsg
->
compressed
=
htonl
(
pMsg
->
numOfBlocks
);
// double etime = getCurTime();
tsdbInsertData
(
pRepo
,
pMsg
);
}
// void *ptr = malloc(150000);
double
etime
=
getCurTime
();
// free(ptr);
// printf("Spent %f seconds to write %d records\n", etime - stime, nRows);
void
*
ptr
=
malloc
(
150000
);
free
(
ptr
);
// tsdbCloseRepo(pRepo
);
printf
(
"Spent %f seconds to write %d records
\n
"
,
etime
-
stime
,
nRows
);
tsdbCloseRepo
(
pRepo
);
}
}
//
TEST(TsdbTest, DISABLED_openRepo) {
TEST
(
TsdbTest
,
DISABLED_openRepo
)
{
TEST
(
TsdbTest
,
openRepo
)
{
//
TEST(TsdbTest, openRepo) {
// tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL);
// tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL);
// ASSERT_NE(repo, nullptr);
// ASSERT_NE(repo, nullptr);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录