Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a03a9c06
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a03a9c06
编写于
3月 29, 2020
作者:
H
haojun Liao
提交者:
GitHub
3月 29, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1459 from taosdata/feature/2.0tsdb
Feature/2.0tsdb
上级
0d468d4a
ffb02d97
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
452 addition
and
190 deletion
+452
-190
src/common/inc/dataformat.h
src/common/inc/dataformat.h
+5
-2
src/common/src/dataformat.c
src/common/src/dataformat.c
+15
-0
src/util/src/tlist.c
src/util/src/tlist.c
+4
-5
src/vnode/tsdb/inc/tsdbFile.h
src/vnode/tsdb/inc/tsdbFile.h
+35
-7
src/vnode/tsdb/src/tsdbFile.c
src/vnode/tsdb/src/tsdbFile.c
+84
-79
src/vnode/tsdb/src/tsdbMain.c
src/vnode/tsdb/src/tsdbMain.c
+309
-97
未找到文件。
src/common/inc/dataformat.h
浏览文件 @
a03a9c06
...
...
@@ -119,19 +119,22 @@ typedef struct {
int
maxPoints
;
// max number of points
int
numOfPoints
;
int
numOfCols
;
// Total number of cols
int
sversion
;
// TODO: set sversion
void
*
buf
;
SDataCol
cols
[];
}
SDataCols
;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsKeyFirst(pCols) ((int64_t *)(keyCol(pCols)->pData))[0]
#define dataColsKeyLast(pCols) ((int64_t *)(keyCol(pCols)->pData))[(pCols)->numOfPoints - 1]
#define dataColsKeyAt(pCols, idx) ((int64_t *)(keyCol(pCols)->pData))[(idx)]
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0)
#define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1)
SDataCols
*
tdNewDataCols
(
int
maxRowSize
,
int
maxCols
,
int
maxRows
);
void
tdResetDataCols
(
SDataCols
*
pCols
);
void
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
);
void
tdFreeDataCols
(
SDataCols
*
pCols
);
void
tdAppendDataRowToDataCol
(
SDataRow
row
,
SDataCols
*
pCols
);
void
tdPopDataColsPoints
(
SDataCols
*
pCols
,
int
pointsToPop
);
#ifdef __cplusplus
}
...
...
src/common/src/dataformat.c
浏览文件 @
a03a9c06
...
...
@@ -353,6 +353,21 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
}
pCols
->
numOfPoints
++
;
}
// Pop pointsToPop points from the SDataCols
void
tdPopDataColsPoints
(
SDataCols
*
pCols
,
int
pointsToPop
)
{
int
pointsLeft
=
pCols
->
numOfPoints
-
pointsToPop
;
for
(
int
iCol
=
0
;
iCol
<
pCols
->
numOfCols
;
iCol
++
)
{
SDataCol
*
p_col
=
pCols
->
cols
+
iCol
;
if
(
p_col
->
len
>
0
)
{
p_col
->
len
=
TYPE_BYTES
[
p_col
->
type
]
*
pointsLeft
;
if
(
pointsLeft
>
0
)
{
memmove
((
void
*
)(
p_col
->
pData
),
(
void
*
)((
char
*
)(
p_col
->
pData
)
+
TYPE_BYTES
[
p_col
->
type
]
*
pointsToPop
),
p_col
->
len
);
}
}
}
pCols
->
numOfPoints
=
pointsLeft
;
}
/**
* Return the first part length of a data row for a schema
...
...
src/util/src/tlist.c
浏览文件 @
a03a9c06
...
...
@@ -138,11 +138,10 @@ SListNode *tdListPopNode(SList *list, SListNode *node) {
// Move all node elements from src to dst, the dst is assumed as an empty list
void
tdListMove
(
SList
*
src
,
SList
*
dst
)
{
// assert(dst->eleSize == src->eleSize);
dst
->
numOfEles
=
src
->
numOfEles
;
dst
->
head
=
src
->
head
;
dst
->
tail
=
src
->
tail
;
src
->
numOfEles
=
0
;
src
->
head
=
src
->
tail
=
NULL
;
SListNode
*
node
=
NULL
;
while
((
node
=
tdListPopHead
(
src
))
!=
NULL
)
{
tdListAppendNode
(
dst
,
node
);
}
}
void
tdListNodeGetData
(
SList
*
list
,
SListNode
*
node
,
void
*
target
)
{
memcpy
(
target
,
node
->
data
,
list
->
eleSize
);
}
...
...
src/vnode/tsdb/inc/tsdbFile.h
浏览文件 @
a03a9c06
...
...
@@ -25,6 +25,9 @@
extern
"C"
{
#endif
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
...
...
@@ -40,13 +43,16 @@ typedef enum {
extern
const
char
*
tsdbFileSuffix
[];
typedef
struct
{
int8_t
type
;
int
fd
;
char
fname
[
128
];
int64_t
size
;
// total size of the file
int64_t
tombSize
;
// unused file size
int32_t
totalBlocks
;
int32_t
totalSubBlocks
;
}
SFileInfo
;
typedef
struct
{
int
fd
;
char
fname
[
128
];
SFileInfo
info
;
}
SFile
;
#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1)
...
...
@@ -69,9 +75,10 @@ typedef struct {
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
);
void
tsdbCloseFileH
(
STsdbFileH
*
pFileH
);
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
char
*
suffix
,
int
maxTables
,
SFile
*
pFile
,
int
writeHeader
,
int
toClose
);
int
tsdbCreateFGroup
(
STsdbFileH
*
pFileH
,
char
*
dataDir
,
int
fid
,
int
maxTables
);
int
tsdbOpenFile
(
SFile
*
pFile
,
int
oflag
);
SFileGroup
*
tsdbOpenFilesForCommit
(
STsdbFileH
*
pFileH
,
int
fid
);
int
tsdbCloseFile
(
SFile
*
pFile
);
SFileGroup
*
tsdbOpenFilesForCommit
(
STsdbFileH
*
pFileH
,
int
fid
);
int
tsdbRemoveFileGroup
(
STsdbFileH
*
pFile
,
int
fid
);
typedef
struct
{
...
...
@@ -104,6 +111,9 @@ typedef struct {
TSKEY
keyLast
;
}
SCompBlock
;
#define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1)
#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
checksum
;
// TODO: decide if checksum logic in this file or make it one API
...
...
@@ -111,8 +121,16 @@ typedef struct {
SCompBlock
blocks
[];
}
SCompInfo
;
int
tsdbLoadCompIdx
(
SFileGroup
*
pGroup
,
void
*
buf
,
int
maxTables
);
int
tsdbLoadCompBlocks
(
SFileGroup
*
pGroup
,
SCompIdx
*
pIdx
,
void
*
buf
);
#define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx))
#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size)\
do {\
if (pCompBlock->numOfSubBlocks > 1) {\
pCompBlock = pCompInfo->blocks + pCompBlock->offset;\
size = pCompBlock->numOfSubBlocks;\
} else {\
size = 1;\
}\
} while (0)
// TODO: take pre-calculation into account
typedef
struct
{
...
...
@@ -130,7 +148,17 @@ typedef struct {
SCompCol
cols
[];
}
SCompData
;
int
tsdbWriteBlockToFile
(
SFileGroup
*
pGroup
,
SCompInfo
*
pCompInfo
,
SCompIdx
*
pIdx
,
int
isMerge
,
SCompBlock
*
pBlock
,
SDataCols
*
pCols
);
int
tsdbCopyBlockDataInFile
(
SFile
*
pOutFile
,
SFile
*
pInFile
,
SCompInfo
*
pCompInfo
,
int
idx
,
int
isLast
,
SDataCols
*
pCols
);
int
tsdbLoadCompIdx
(
SFileGroup
*
pGroup
,
void
*
buf
,
int
maxTables
);
int
tsdbLoadCompBlocks
(
SFileGroup
*
pGroup
,
SCompIdx
*
pIdx
,
void
*
buf
);
int
tsdbLoadCompCols
(
SFile
*
pFile
,
SCompBlock
*
pBlock
,
void
*
buf
);
int
tsdbLoadColData
(
SFile
*
pFile
,
SCompCol
*
pCol
,
int64_t
blockBaseOffset
,
void
*
buf
);
int
tsdbLoadDataBlock
(
SFile
*
pFile
,
SCompBlock
*
pStartBlock
,
int
numOfBlocks
,
SDataCols
*
pCols
,
SCompData
*
pCompData
);
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
);
// TODO: need an API to merge all sub-block data into one
void
tsdbGetKeyRangeOfFileId
(
int32_t
daysPerFile
,
int8_t
precision
,
int32_t
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
#ifdef __cplusplus
...
...
src/vnode/tsdb/src/tsdbFile.c
浏览文件 @
a03a9c06
...
...
@@ -24,9 +24,6 @@
#include "tsdbFile.h"
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
const
char
*
tsdbFileSuffix
[]
=
{
".head"
,
// TSDB_FILE_TYPE_HEAD
".data"
,
// TSDB_FILE_TYPE_DATA
...
...
@@ -35,11 +32,9 @@ const char *tsdbFileSuffix[] = {
static
int
compFGroupKey
(
const
void
*
key
,
const
void
*
fgroup
);
static
int
compFGroup
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbGetFileName
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
char
*
fname
);
static
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
int
maxTables
,
SFile
*
pFile
);
static
int
tsdbGetFileName
(
char
*
dataDir
,
int
fileId
,
char
*
suffix
,
char
*
fname
);
static
int
tsdbWriteFileHead
(
SFile
*
pFile
);
static
int
tsdbWriteHeadFileIdx
(
SFile
*
pFile
,
int
maxTables
);
static
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
);
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
)
{
STsdbFileH
*
pFileH
=
(
STsdbFileH
*
)
calloc
(
1
,
sizeof
(
STsdbFileH
)
+
sizeof
(
SFileGroup
)
*
maxFiles
);
...
...
@@ -71,10 +66,10 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables)
SFileGroup
fGroup
;
SFileGroup
*
pFGroup
=
&
fGroup
;
if
(
tsdbSearchFGroup
(
pFileH
,
fid
)
==
NULL
)
{
if
(
tsdbSearchFGroup
(
pFileH
,
fid
)
==
NULL
)
{
// if not exists, create one
pFGroup
->
fileId
=
fid
;
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbCreateFile
(
dataDir
,
fid
,
t
ype
,
maxTables
,
&
(
pFGroup
->
files
[
type
])
)
<
0
)
{
if
(
tsdbCreateFile
(
dataDir
,
fid
,
t
sdbFileSuffix
[
type
],
maxTables
,
&
(
pFGroup
->
files
[
type
]),
type
==
TSDB_FILE_TYPE_HEAD
?
1
:
0
,
1
)
<
0
)
{
// TODO: deal with the ERROR here, remove those creaed file
return
-
1
;
}
...
...
@@ -106,6 +101,61 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
return
0
;
}
int
tsdbLoadDataBlock
(
SFile
*
pFile
,
SCompBlock
*
pStartBlock
,
int
numOfBlocks
,
SDataCols
*
pCols
,
SCompData
*
pCompData
)
{
SCompBlock
*
pBlock
=
pStartBlock
;
for
(
int
i
=
0
;
i
<
numOfBlocks
;
i
++
)
{
if
(
tsdbLoadCompCols
(
pFile
,
pBlock
,
(
void
*
)
pCompData
)
<
0
)
return
-
1
;
for
(
int
iCol
=
0
;
iCol
<
pBlock
->
numOfCols
;
iCol
++
)
{
SCompCol
*
pCompCol
=
&
(
pCompData
->
cols
[
iCol
]);
pCols
->
numOfPoints
+=
pBlock
->
numOfPoints
;
int
k
=
0
;
for
(;
k
<
pCols
->
numOfCols
;
k
++
)
{
if
(
pCompCol
->
colId
==
pCols
->
cols
[
k
].
colId
)
break
;
}
if
(
tsdbLoadColData
(
pFile
,
pCompCol
,
pBlock
->
offset
,
(
void
*
)((
char
*
)(
pCols
->
cols
[
k
].
pData
)
+
pCols
->
cols
[
k
].
len
))
<
0
)
return
-
1
;
}
pStartBlock
++
;
}
return
0
;
}
int
tsdbCopyBlockDataInFile
(
SFile
*
pOutFile
,
SFile
*
pInFile
,
SCompInfo
*
pCompInfo
,
int
idx
,
int
isLast
,
SDataCols
*
pCols
)
{
SCompBlock
*
pSuperBlock
=
TSDB_COMPBLOCK_AT
(
pCompInfo
,
idx
);
SCompBlock
*
pStartBlock
=
NULL
;
SCompBlock
*
pBlock
=
NULL
;
int
numOfBlocks
=
pSuperBlock
->
numOfSubBlocks
;
if
(
numOfBlocks
==
1
)
pStartBlock
=
pSuperBlock
;
else
pStartBlock
=
TSDB_COMPBLOCK_AT
(
pCompInfo
,
pSuperBlock
->
offset
);
int
maxNumOfCols
=
0
;
pBlock
=
pStartBlock
;
for
(
int
i
=
0
;
i
<
numOfBlocks
;
i
++
)
{
if
(
pBlock
->
numOfCols
>
maxNumOfCols
)
maxNumOfCols
=
pBlock
->
numOfCols
;
pBlock
++
;
}
SCompData
*
pCompData
=
(
SCompData
*
)
malloc
(
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
maxNumOfCols
);
if
(
pCompData
==
NULL
)
return
-
1
;
// Load data from the block
if
(
tsdbLoadDataBlock
(
pOutFile
,
pStartBlock
,
numOfBlocks
,
pCols
,
pCompData
));
// Write data block to the file
{
// TODO
}
if
(
pCompData
)
free
(
pCompData
);
return
0
;
}
int
tsdbLoadCompIdx
(
SFileGroup
*
pGroup
,
void
*
buf
,
int
maxTables
)
{
SFile
*
pFile
=
&
(
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
]);
if
(
lseek
(
pFile
->
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
return
-
1
;
...
...
@@ -127,62 +177,19 @@ int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
return
0
;
}
static
int
tsdbWriteBlockToFileImpl
(
SFile
*
pFile
,
// File to write
SDataCols
*
pCols
,
// Data column buffer
int
numOfPointsToWrie
,
// Number of points to write to the file
SCompBlock
*
pBlock
// SCompBlock to hold block information to return
)
{
// pBlock->last = 0;
// pBlock->offset = lseek(pFile->fd, 0, SEEK_END);
// // pBlock->algorithm = ;
// pBlock->numOfPoints = pCols->numOfPoints;
// // pBlock->sversion = ;
// // pBlock->len = ;
// pBlock->numOfSubBlocks = 1;
// pBlock->keyFirst = dataColsKeyFirst(pCols);
// pBlock->keyLast = dataColsKeyLast(pCols);
// for (int i = 0; i < pCols->numOfCols; i++) {
// // TODO: if all col value is NULL, do not save it
// pBlock->numOfCols++;
// pCompData->numOfCols++;
// SCompCol *pCompCol = pCompData->cols + i;
// pCompCol->colId = pCols->cols[i].colId;
// pCompCol->type = pCols->cols[i].type;
// // pCompCol->len = ;
// // pCompCol->offset = ;
// }
int
tsdbLoadCompCols
(
SFile
*
pFile
,
SCompBlock
*
pBlock
,
void
*
buf
)
{
// assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
if
(
lseek
(
pFile
->
fd
,
pBlock
->
offset
,
SEEK_SET
)
<
0
)
return
-
1
;
size_t
size
=
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pBlock
->
numOfCols
;
if
(
read
(
pFile
->
fd
,
buf
,
size
)
<
0
)
return
-
1
;
return
0
;
}
int
tsdbWriteBlockToFile
(
SFileGroup
*
pGroup
,
SCompInfo
*
pCompInfo
,
SCompIdx
*
pIdx
,
int
isMerge
,
SCompBlock
*
pBlock
,
SDataCols
*
pCols
)
{
memset
((
void
*
)
pBlock
,
0
,
sizeof
(
SCompBlock
));
SFile
*
pFile
=
NULL
;
SCompData
*
pCompData
=
(
SCompData
*
)
malloc
(
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pCols
->
numOfCols
);
if
(
pCompData
==
NULL
)
return
-
1
;
pCompData
->
delimiter
=
TSDB_FILE_DELIMITER
;
// pCompData->uid = ;
if
(
isMerge
)
{
TSKEY
keyFirst
=
dataColsKeyFirst
(
pCols
);
// 1. Binary search the block the data can merged into
if
(
1
/* the data should only merged into last file */
)
{
}
else
{
}
}
else
{
// Write directly to the file without merge
if
(
1
/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/
)
{
// TODO: write the data to the last file
}
else
{
// TODO: wirte the data to the data file
}
}
// TODO: need to update pIdx
if
(
pCompData
)
free
(
pCompData
);
int
tsdbLoadColData
(
SFile
*
pFile
,
SCompCol
*
pCol
,
int64_t
blockBaseOffset
,
void
*
buf
)
{
if
(
lseek
(
pFile
->
fd
,
blockBaseOffset
+
pCol
->
offset
,
SEEK_SET
)
<
0
)
return
-
1
;
if
(
read
(
pFile
->
fd
,
buf
,
pCol
->
len
)
<
0
)
return
-
1
;
return
0
;
}
...
...
@@ -199,7 +206,7 @@ static int compFGroup(const void *arg1, const void *arg2) {
static
int
tsdbWriteFileHead
(
SFile
*
pFile
)
{
char
head
[
TSDB_FILE_HEAD_SIZE
]
=
"
\0
"
;
pFile
->
size
+=
TSDB_FILE_HEAD_SIZE
;
pFile
->
info
.
size
+=
TSDB_FILE_HEAD_SIZE
;
// TODO: write version and File statistic to the head
lseek
(
pFile
->
fd
,
0
,
SEEK_SET
);
...
...
@@ -223,16 +230,16 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
return
-
1
;
}
pFile
->
size
+=
size
;
pFile
->
info
.
size
+=
size
;
free
(
buf
);
return
0
;
}
static
int
tsdbGetFileName
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
char
*
fname
)
{
if
(
dataDir
==
NULL
||
fname
==
NULL
||
!
IS_VALID_TSDB_FILE_TYPE
(
type
)
)
return
-
1
;
static
int
tsdbGetFileName
(
char
*
dataDir
,
int
fileId
,
char
*
suffix
,
char
*
fname
)
{
if
(
dataDir
==
NULL
||
fname
==
NULL
)
return
-
1
;
sprintf
(
fname
,
"%s/f%d%s"
,
dataDir
,
fileId
,
tsdbFileSuffix
[
type
]
);
sprintf
(
fname
,
"%s/f%d%s"
,
dataDir
,
fileId
,
suffix
);
return
0
;
}
...
...
@@ -246,6 +253,12 @@ int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function
return
0
;
}
int
tsdbCloseFile
(
SFile
*
pFile
)
{
int
ret
=
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
return
ret
;
}
SFileGroup
*
tsdbOpenFilesForCommit
(
STsdbFileH
*
pFileH
,
int
fid
)
{
SFileGroup
*
pGroup
=
tsdbSearchFGroup
(
pFileH
,
fid
);
if
(
pGroup
==
NULL
)
return
NULL
;
...
...
@@ -256,20 +269,12 @@ SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
return
pGroup
;
}
static
int
tsdbCloseFile
(
SFile
*
pFile
)
{
if
(
!
TSDB_IS_FILE_OPENED
(
pFile
))
return
-
1
;
int
ret
=
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
return
ret
;
}
static
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
int
maxTables
,
SFile
*
pFile
)
{
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
char
*
suffix
,
int
maxTables
,
SFile
*
pFile
,
int
writeHeader
,
int
toClose
)
{
memset
((
void
*
)
pFile
,
0
,
sizeof
(
SFile
));
pFile
->
type
=
type
;
pFile
->
fd
=
-
1
;
tsdbGetFileName
(
dataDir
,
fileId
,
type
,
pFile
->
fname
);
tsdbGetFileName
(
dataDir
,
fileId
,
suffix
,
pFile
->
fname
);
if
(
access
(
pFile
->
fname
,
F_OK
)
==
0
)
{
// File already exists
return
-
1
;
...
...
@@ -280,7 +285,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
return
-
1
;
}
if
(
type
==
TSDB_FILE_TYPE_HEAD
)
{
if
(
writeHeader
)
{
if
(
tsdbWriteHeadFileIdx
(
pFile
,
maxTables
)
<
0
)
{
tsdbCloseFile
(
pFile
);
return
-
1
;
...
...
@@ -292,7 +297,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
return
-
1
;
}
tsdbCloseFile
(
pFile
);
if
(
toClose
)
tsdbCloseFile
(
pFile
);
return
0
;
}
...
...
@@ -303,7 +308,7 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file
*
maxKey
=
*
minKey
+
daysPerFile
*
tsMsPerDay
[
precision
]
-
1
;
}
static
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
)
{
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
)
{
if
(
pFileH
->
numOfFGroups
==
0
||
fid
<
pFileH
->
fGroup
[
0
].
fileId
||
fid
>
pFileH
->
fGroup
[
pFileH
->
numOfFGroups
-
1
].
fileId
)
return
NULL
;
void
*
ptr
=
bsearch
((
void
*
)
&
fid
,
(
void
*
)(
pFileH
->
fGroup
),
pFileH
->
numOfFGroups
,
sizeof
(
SFileGroup
),
compFGroupKey
);
...
...
src/vnode/tsdb/src/tsdbMain.c
浏览文件 @
a03a9c06
...
...
@@ -8,6 +8,7 @@
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/sendfile.h>
#include <unistd.h>
// #include "taosdef.h"
...
...
@@ -45,6 +46,7 @@
#define TSDB_CFG_FILE_NAME "CONFIG"
#define TSDB_DATA_DIR_NAME "data"
#define TSDB_DEFAULT_FILE_BLOCK_ROW_OPTION 0.7
#define TSDB_MAX_LAST_FILE_SIZE (1024 * 1024 * 10) // 10M
enum
{
TSDB_REPO_STATE_ACTIVE
,
TSDB_REPO_STATE_CLOSED
,
TSDB_REPO_STATE_CONFIGURING
};
...
...
@@ -86,6 +88,10 @@ static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
);
static
void
*
tsdbCommitData
(
void
*
arg
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SSkipListIterator
**
iters
,
SDataCols
*
pCols
);
static
int
tsdbHasDataInRange
(
SSkipListIterator
*
pIter
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbHasDataToCommit
(
SSkipListIterator
**
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbWriteBlockToFileImpl
(
SFile
*
pFile
,
SDataCols
*
pCols
,
int
pointsToWrite
,
int64_t
*
offset
,
int32_t
*
len
,
int64_t
uid
);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...
...
@@ -326,10 +332,13 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
pRepo
->
tsdbCache
->
imem
=
pRepo
->
tsdbCache
->
mem
;
pRepo
->
tsdbCache
->
mem
=
NULL
;
pRepo
->
tsdbCache
->
curBlock
=
NULL
;
tsdbUnLockRepo
(
repo
);
// TODO: here should set as detached or use join for memory leak
pthread_create
(
&
(
pRepo
->
commitThread
),
NULL
,
tsdbCommitData
,
(
void
*
)
repo
);
tsdbUnLockRepo
(
repo
);
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
pthread_create
(
&
(
pRepo
->
commitThread
),
&
thattr
,
tsdbCommitData
,
(
void
*
)
repo
);
return
0
;
}
...
...
@@ -775,7 +784,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
tdAppendDataRowToDataCol
(
row
,
pCols
);
numOfRows
++
;
if
(
numOfRows
>
maxRowsToRead
)
break
;
if
(
numOfRows
>
=
maxRowsToRead
)
break
;
}
while
(
tSkipListIterNext
(
pIter
));
return
numOfRows
;
...
...
@@ -807,7 +816,9 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables)
}
if
(
!
tSkipListIterNext
(
iters
[
tid
]))
{
assert
(
false
);
// No data in this iterator
tSkipListDestroyIter
(
iters
[
tid
]);
iters
[
tid
]
=
NULL
;
}
}
...
...
@@ -832,8 +843,8 @@ static void *tsdbCommitData(void *arg) {
}
// Create a data column buffer for commit
SDataCols
*
pCols
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pCfg
->
maxRowsPerFileBlock
);
if
(
pCols
==
NULL
)
{
SDataCols
*
p
Data
Cols
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pCfg
->
maxRowsPerFileBlock
);
if
(
p
Data
Cols
==
NULL
)
{
// TODO: deal with the error
return
NULL
;
}
...
...
@@ -842,13 +853,15 @@ static void *tsdbCommitData(void *arg) {
int
efid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
for
(
int
fid
=
sfid
;
fid
<=
efid
;
fid
++
)
{
tsdbCommitToFile
(
pRepo
,
fid
,
iters
,
pCols
);
if
(
tsdbCommitToFile
(
pRepo
,
fid
,
iters
,
pDataCols
)
<
0
)
{
// TODO: deal with the error here
// assert(0);
}
}
tdFreeDataCols
(
pCols
);
tdFreeDataCols
(
p
Data
Cols
);
tsdbDestroyTableIters
(
iters
,
pCfg
->
maxTables
);
tsdbLockRepo
(
arg
);
tdListMove
(
pCache
->
imem
->
list
,
pCache
->
pool
.
memPool
);
free
(
pCache
->
imem
);
...
...
@@ -867,12 +880,12 @@ static void *tsdbCommitData(void *arg) {
}
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SSkipListIterator
**
iters
,
SDataCols
*
pCols
)
{
int
flag
=
0
;
int
isNewLastFile
=
0
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
SFile
t
File
,
lFile
;
SFile
h
File
,
lFile
;
SFileGroup
*
pGroup
=
NULL
;
SCompIdx
*
pIndices
=
NULL
;
SCompInfo
*
pCompInfo
=
NULL
;
...
...
@@ -883,102 +896,154 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
TSKEY
minKey
=
0
,
maxKey
=
0
;
tsdbGetKeyRangeOfFileId
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
// Check if there are data to commit to this file
int
hasDataToCommit
=
tsdbHasDataToCommit
(
iters
,
pCfg
->
maxTables
,
minKey
,
maxKey
);
if
(
!
hasDataToCommit
)
return
0
;
// No data to commit, just return
// Create and open files for commit
tsdbGetDataDirName
(
pRepo
,
dataDir
);
if
(
tsdbCreateFGroup
(
pFileH
,
dataDir
,
fid
,
pCfg
->
maxTables
)
<
0
)
{
/* TODO */
}
pGroup
=
tsdbOpenFilesForCommit
(
pFileH
,
fid
);
if
(
pGroup
==
NULL
)
{
/* TODO */
}
tsdbCreateFile
(
dataDir
,
fid
,
".h"
,
pCfg
->
maxTables
,
&
hFile
,
1
,
0
);
if
(
1
/*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/
)
{
// TODO: make it not to write the last file every time
tsdbCreateFile
(
dataDir
,
fid
,
".l"
,
pCfg
->
maxTables
,
&
lFile
,
0
,
0
);
isNewLastFile
=
1
;
}
// Load the SCompIdx
pIndices
=
(
SCompIdx
*
)
malloc
(
sizeof
(
SCompIdx
)
*
pCfg
->
maxTables
);
if
(
pIndices
==
NULL
)
{
/* TODO*/
}
if
(
tsdbLoadCompIdx
(
pGroup
,
(
void
*
)
pIndices
,
pCfg
->
maxTables
)
<
0
)
{
/* TODO */
}
lseek
(
hFile
.
fd
,
TSDB_FILE_HEAD_SIZE
+
sizeof
(
SCompIdx
)
*
pCfg
->
maxTables
,
SEEK_SET
);
// Loop to commit data in each table
for
(
int
tid
=
0
;
tid
<
pCfg
->
maxTables
;
tid
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
tid
];
SSkipListIterator
*
pIter
=
iters
[
tid
];
int
isLoadCompBlocks
=
0
;
SCompIdx
*
pIdx
=
&
pIndices
[
tid
]
;
if
(
pIter
==
NULL
)
continue
;
tdInitDataCols
(
pCols
,
pTable
->
schema
);
if
(
pTable
==
NULL
||
pIter
==
NULL
)
continue
;
int
numOfWrites
=
0
;
// while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) {
// break;
// if (!flag) {
// // There are data to commit to this file, we need to create/open it for read/write.
// // At the meantime, we set the flag to prevent further create/open operations
// if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
// // Open files for commit
// pGroup = tsdbOpenFilesForCommit(pFileH, fid);
// if (pGroup == NULL) {
// // TODO: deal with the ERROR here
// }
// // TODO: open .h file and if neccessary, open .l file
// {}
// pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
// if (pIndices == NULL) {
// // TODO: deal with the ERROR
// }
// // load the SCompIdx part
// if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
/* If no new data to write for this table, just write the old data to new file
* if there are.
*/
if
(
!
tsdbHasDataInRange
(
pIter
,
minKey
,
maxKey
))
{
// has old data
if
(
pIdx
->
offset
>
0
)
{
if
(
isNewLastFile
&&
pIdx
->
hasLast
)
{
// need to move the last block to new file
if
((
pCompInfo
=
(
SCompInfo
*
)
realloc
((
void
*
)
pCompInfo
,
pIdx
->
len
))
==
NULL
)
{
/* TODO */
}
if
(
tsdbLoadCompBlocks
(
pGroup
,
pIdx
,
(
void
*
)
pCompInfo
)
<
0
)
{
/* TODO */
}
// // TODO: sendfile those not need changed table content
// for (int ttid = 0; ttid < tid; ttid++) {
// // SCompIdx *pIdx = &pIndices[ttid];
// // if (pIdx->len > 0) {
// // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR);
// // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len);
// // }
// }
// flag = 1;
// }
tdInitDataCols
(
pCols
,
pTable
->
schema
);
// SCompIdx *pIdx = &pIndices[tid];
// /* The first time to write to the table, need to decide
// * if it is neccessary to load the SComplock part. If it
// * is needed, just load it, or, just use sendfile and
// * append it.
// */
// if (numOfWrites == 0 && pIdx->offset > 0) {
// if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
// if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
// // TODO: deal with the ERROR here
// }
// if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
// } else {
// // TODO: sendfile the prefix part
// }
// }
SCompBlock
*
pTBlock
=
TSDB_COMPBLOCK_AT
(
pCompInfo
,
pIdx
->
numOfSuperBlocks
);
int
nBlocks
=
0
;
// // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) {
// // // TODO: deal with the ERROR here
// // }
TSDB_COMPBLOCK_GET_START_AND_SIZE
(
pCompInfo
,
pTBlock
,
nBlocks
);
// // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
SCompBlock
tBlock
;
int64_t
toffset
,
tlen
;
tsdbLoadDataBlock
(
&
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
],
pTBlock
,
nBlocks
,
pCols
,
&
tBlock
);
tsdbWriteBlockToFileImpl
(
&
lFile
,
pCols
,
pCols
->
numOfPoints
,
&
toffset
,
tlen
,
pTable
->
tableId
.
uid
);
pTBlock
=
TSDB_COMPBLOCK_AT
(
pCompInfo
,
pIdx
->
numOfSuperBlocks
);
pTBlock
->
offset
=
toffset
;
pTBlock
->
len
=
tlen
;
pTBlock
->
numOfPoints
=
pCols
->
numOfPoints
;
pTBlock
->
numOfSubBlocks
=
1
;
// // if (1 /* the SCompBlock part is not loaded*/) {
// // // Append to .data file generate a SCompBlock and record it
// // } else {
// // }
pIdx
->
offset
=
lseek
(
hFile
.
fd
,
0
,
SEEK_CUR
);
if
(
nBlocks
>
1
)
{
pIdx
->
len
-=
(
sizeof
(
SCompBlock
)
*
nBlocks
);
}
write
(
hFile
.
fd
,
(
void
*
)
pCompInfo
,
pIdx
->
len
);
}
else
{
pIdx
->
offset
=
lseek
(
hFile
.
fd
,
0
,
SEEK_CUR
);
sendfile
(
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fd
,
hFile
.
fd
,
NULL
,
pIdx
->
len
);
hFile
.
info
.
size
+=
pIdx
->
len
;
}
}
continue
;
}
// // // TODO: need to reset the pCols
// Load SCompBlock part if neccessary
int
isCompBlockLoaded
=
0
;
if
(
pIdx
->
offset
>
0
)
{
if
(
pIdx
->
hasLast
||
tsdbHasDataInRange
(
pIter
,
minKey
,
pIdx
->
maxKey
))
{
// has last block || cache key overlap with commit key
pCompInfo
=
(
SCompInfo
*
)
realloc
((
void
*
)
pCompInfo
,
pIdx
->
len
+
sizeof
(
SCompBlock
)
*
100
);
if
(
tsdbLoadCompBlocks
(
pGroup
,
pIdx
,
(
void
*
)
pCompInfo
)
<
0
)
{
/* TODO */
}
if
(
pCompInfo
->
uid
==
pTable
->
tableId
.
uid
)
isCompBlockLoaded
=
1
;
}
else
{
// TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part
// and write those new blocks to it
}
}
// numOfWrites++;
// }
tdInitDataCols
(
pCols
,
pTable
->
schema
);
// if (pCols->numOfPoints > 0) {
// // TODO: still has data to commit, commit it
// }
int
maxRowsToRead
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
while
(
1
)
{
tsdbReadRowsFromCache
(
pIter
,
maxKey
,
maxRowsToRead
,
pCols
);
if
(
pCols
->
numOfPoints
==
0
)
break
;
int
pointsWritten
=
0
;
// { // TODO : try to write the block data to file
// if (!isCompBlockLoaded) { // Just append
// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file
// lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END);
// if (1/* SCompBlock part is loaded, write it to .head file*/) {
// // TODO
// } else {
// // TODO: use sendfile send the old part and append the newly added part
// if (isNewLastFile) { // write directly to .l file
// } else { // write directly to .last file
// }
// }
// } else { // Need to append
// // SCompBlock *pTBlock = NULL;
// }
// }
// pointsWritten = pCols->numOfPoints;
tdPopDataColsPoints
(
pCols
,
pointsWritten
);
maxRowsToRead
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
-
pCols
->
numOfPoints
;
}
// Write the SCompBlock part
if
(
isCompBlockLoaded
)
{
// merge the block into old and update pIdx
}
else
{
// sendfile the SCompBlock part and update the pIdx
}
}
// Write the SCompIdx part
if
(
lseek
(
hFile
.
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
{
/* TODO */
}
if
(
write
(
hFile
.
fd
,
(
void
*
)
pIndices
,
sizeof
(
SCompIdx
)
*
pCfg
->
maxTables
)
<
0
)
{
/* TODO */
}
// Close all files and return
if
(
flag
)
{
// TODO
// close the files
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
tsdbCloseFile
(
&
pGroup
->
files
[
type
]);
}
tsdbCloseFile
(
&
hFile
);
if
(
isNewLastFile
)
tsdbCloseFile
(
&
lFile
);
// TODO: replace the .head and .last file
rename
(
hFile
.
fname
,
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fname
);
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
info
=
hFile
.
info
;
if
(
isNewLastFile
)
{
rename
(
lFile
.
fname
,
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
].
fname
);
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
].
info
=
lFile
.
info
;
}
if
(
pIndices
)
free
(
pIndices
);
...
...
@@ -986,3 +1051,150 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
return
0
;
}
static
int
tsdbHasDataInRange
(
SSkipListIterator
*
pIter
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
if
(
pIter
==
NULL
)
return
0
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
0
;
SDataRow
row
=
SL_GET_NODE_DATA
(
node
);
if
(
dataRowKey
(
row
)
>=
minKey
&&
dataRowKey
(
row
)
<=
maxKey
)
return
1
;
return
0
;
}
static
int
tsdbHasDataToCommit
(
SSkipListIterator
**
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
for
(
int
i
=
0
;
i
<
nIters
;
i
++
)
{
SSkipListIterator
*
pIter
=
iters
[
i
];
if
(
tsdbHasDataInRange
(
pIter
,
minKey
,
maxKey
))
return
1
;
}
return
0
;
}
static
int
tsdbWriteBlockToFileImpl
(
SFile
*
pFile
,
SDataCols
*
pCols
,
int
pointsToWrite
,
int64_t
*
offset
,
int32_t
*
len
,
int64_t
uid
)
{
size_t
size
=
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pCols
->
numOfCols
;
SCompData
*
pCompData
=
(
SCompData
*
)
malloc
(
size
);
if
(
pCompData
==
NULL
)
return
-
1
;
pCompData
->
delimiter
=
TSDB_FILE_DELIMITER
;
pCompData
->
uid
=
uid
;
pCompData
->
numOfCols
=
pCols
->
numOfCols
;
*
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
*
len
=
size
;
int
toffset
=
size
;
for
(
int
iCol
=
0
;
iCol
<
pCols
->
numOfCols
;
iCol
++
)
{
SCompCol
*
pCompCol
=
pCompData
->
cols
+
iCol
;
SDataCol
*
pDataCol
=
pCols
->
cols
+
iCol
;
pCompCol
->
colId
=
pDataCol
->
colId
;
pCompCol
->
type
=
pDataCol
->
type
;
pCompCol
->
offset
=
toffset
;
// TODO: add compression
pCompCol
->
len
=
TYPE_BYTES
[
pCompCol
->
type
]
*
pointsToWrite
;
toffset
+=
pCompCol
->
len
;
}
// Write the block
if
(
write
(
pFile
->
fd
,
(
void
*
)
pCompData
,
size
)
<
0
)
goto
_err
;
*
len
+=
size
;
for
(
int
iCol
=
0
;
iCol
<
pCols
->
numOfCols
;
iCol
++
)
{
SDataCol
*
pDataCol
=
pCols
->
cols
+
iCol
;
SCompCol
*
pCompCol
=
pCompData
->
cols
+
iCol
;
if
(
write
(
pFile
->
fd
,
pDataCol
->
pData
,
pCompCol
->
len
)
<
0
)
goto
_err
;
*
len
+=
pCompCol
->
len
;
}
if
(
pCompData
==
NULL
)
free
((
void
*
)
pCompData
);
return
0
;
_err:
if
(
pCompData
==
NULL
)
free
((
void
*
)
pCompData
);
return
-
1
;
}
static
int
compareKeyBlock
(
const
void
*
arg1
,
const
void
*
arg2
)
{
TSKEY
key
=
*
(
TSKEY
*
)
arg1
;
SCompBlock
*
pBlock
=
(
SCompBlock
*
)
arg2
;
if
(
key
<
pBlock
->
keyFirst
)
{
return
-
1
;
}
else
if
(
key
>
pBlock
->
keyLast
)
{
return
1
;
}
return
0
;
}
static
int
tsdbWriteBlockToFile
(
STsdbRepo
*
pRepo
,
SFileGroup
*
pGroup
,
SCompIdx
*
pIdx
,
SCompInfo
*
pCompInfo
,
SDataCols
*
pCols
,
SCompBlock
*
pCompBlock
,
SFile
*
lFile
,
int64_t
uid
)
{
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
SCompData
*
pCompData
=
NULL
;
SFile
*
pFile
=
NULL
;
int
numOfPointsToWrite
=
0
;
int64_t
offset
=
0
;
int32_t
len
=
0
;
memset
((
void
*
)
pCompBlock
,
0
,
sizeof
(
SCompBlock
));
if
(
pCompInfo
==
NULL
)
{
// Just append the data block to .data or .l or .last file
numOfPointsToWrite
=
pCols
->
numOfPoints
;
if
(
pCols
->
numOfPoints
>
pCfg
->
minRowsPerFileBlock
)
{
// Write to .data file
pFile
=
&
(
pGroup
->
files
[
TSDB_FILE_TYPE_DATA
]);
}
else
{
// Write to .last or .l file
pCompBlock
->
last
=
1
;
if
(
lFile
)
{
pFile
=
lFile
;
}
else
{
pFile
=
&
(
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
]);
}
}
tsdbWriteBlockToFileImpl
(
pFile
,
pCols
,
numOfPointsToWrite
,
&
offset
,
&
len
,
uid
);
pCompBlock
->
offset
=
offset
;
pCompBlock
->
len
=
len
;
pCompBlock
->
algorithm
=
2
;
// TODO : add to configuration
pCompBlock
->
sversion
=
pCols
->
sversion
;
pCompBlock
->
numOfPoints
=
pCols
->
numOfPoints
;
pCompBlock
->
numOfSubBlocks
=
1
;
pCompBlock
->
numOfCols
=
pCols
->
numOfCols
;
pCompBlock
->
keyFirst
=
dataColsKeyFirst
(
pCols
);
pCompBlock
->
keyLast
=
dataColsKeyLast
(
pCols
);
}
else
{
// Need to merge the block to either the last block or the other block
TSKEY
keyFirst
=
dataColsKeyFirst
(
pCols
);
SCompBlock
*
pMergeBlock
=
NULL
;
// Search the block to merge in
void
*
ptr
=
taosbsearch
((
void
*
)
&
keyFirst
,
(
void
*
)(
pCompInfo
->
blocks
),
sizeof
(
SCompBlock
),
pIdx
->
numOfSuperBlocks
,
compareKeyBlock
,
TD_GE
);
if
(
ptr
==
NULL
)
{
// No block greater or equal than the key, but there are data in the .last file, need to merge the last file block
// and merge the data
pMergeBlock
=
TSDB_COMPBLOCK_AT
(
pCompInfo
,
pIdx
->
numOfSuperBlocks
-
1
);
}
else
{
pMergeBlock
=
(
SCompBlock
*
)
ptr
;
}
if
(
pMergeBlock
->
last
)
{
if
(
pMergeBlock
->
last
+
pCols
->
numOfPoints
>
pCfg
->
minRowsPerFileBlock
)
{
// Need to load the data from .last and combine data in pCols to write to .data file
}
else
{
// Just append the block to .last or .l file
if
(
lFile
)
{
// read the block from .last file and merge with pCols, write to .l file
}
else
{
// tsdbWriteBlockToFileImpl();
}
}
}
else
{
// The block need to merge in .data file
}
}
return
numOfPointsToWrite
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录