Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
240a201b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
240a201b
编写于
3月 26, 2020
作者:
H
hzcheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-34
上级
81f27c37
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
315 addition
and
29 deletion
+315
-29
src/common/inc/dataformat.h
src/common/inc/dataformat.h
+9
-8
src/common/src/dataformat.c
src/common/src/dataformat.c
+1
-0
src/util/inc/tutil.h
src/util/inc/tutil.h
+8
-0
src/util/src/tutil.c
src/util/src/tutil.c
+69
-0
src/vnode/tsdb/inc/tsdbFile.h
src/vnode/tsdb/inc/tsdbFile.h
+20
-10
src/vnode/tsdb/src/tsdbFile.c
src/vnode/tsdb/src/tsdbFile.c
+102
-5
src/vnode/tsdb/src/tsdbMain.c
src/vnode/tsdb/src/tsdbMain.c
+106
-6
未找到文件。
src/common/inc/dataformat.h
浏览文件 @
240a201b
...
@@ -106,6 +106,7 @@ SDataRow tdDataRowDup(SDataRow row);
...
@@ -106,6 +106,7 @@ SDataRow tdDataRowDup(SDataRow row);
// ----------------- Data column structure
// ----------------- Data column structure
typedef
struct
SDataCol
{
typedef
struct
SDataCol
{
int8_t
type
;
int8_t
type
;
int16_t
colId
;
int
bytes
;
int
bytes
;
int
len
;
int
len
;
int
offset
;
int
offset
;
...
@@ -122,9 +123,9 @@ typedef struct {
...
@@ -122,9 +123,9 @@ typedef struct {
SDataCol
cols
[];
SDataCol
cols
[];
}
SDataCols
;
}
SDataCols
;
#define keyCol(
cols) (&((cols)->cols[0]))
// Key column
#define keyCol(
pCols) (&((pCols)->cols[0]))
// Key column
#define dataColsKeyFirst(
cols) ((int64_t *)(keyCol(c
ols)->pData))[0]
#define dataColsKeyFirst(
pCols) ((int64_t *)(keyCol(pC
ols)->pData))[0]
#define dataColsKeyLast(
cols) ((int64_t *)(keyCol(cols)->pData))[(c
ols)->numOfPoints - 1]
#define dataColsKeyLast(
pCols) ((int64_t *)(keyCol(pCols)->pData))[(pC
ols)->numOfPoints - 1]
SDataCols
*
tdNewDataCols
(
int
maxRowSize
,
int
maxCols
,
int
maxRows
);
SDataCols
*
tdNewDataCols
(
int
maxRowSize
,
int
maxCols
,
int
maxRows
);
void
tdResetDataCols
(
SDataCols
*
pCols
);
void
tdResetDataCols
(
SDataCols
*
pCols
);
...
...
src/common/src/dataformat.c
浏览文件 @
240a201b
...
@@ -324,6 +324,7 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
...
@@ -324,6 +324,7 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
pCols
->
cols
[
i
].
type
=
colType
(
schemaColAt
(
pSchema
,
i
));
pCols
->
cols
[
i
].
type
=
colType
(
schemaColAt
(
pSchema
,
i
));
pCols
->
cols
[
i
].
bytes
=
colBytes
(
schemaColAt
(
pSchema
,
i
));
pCols
->
cols
[
i
].
bytes
=
colBytes
(
schemaColAt
(
pSchema
,
i
));
pCols
->
cols
[
i
].
offset
=
colOffset
(
schemaColAt
(
pSchema
,
i
));
pCols
->
cols
[
i
].
offset
=
colOffset
(
schemaColAt
(
pSchema
,
i
));
pCols
->
cols
[
i
].
colId
=
colColId
(
schemaColAt
(
pSchema
,
i
));
}
}
return
pCols
;
return
pCols
;
...
...
src/util/inc/tutil.h
浏览文件 @
240a201b
...
@@ -176,6 +176,14 @@ uint32_t ip2uint(const char *const ip_addr);
...
@@ -176,6 +176,14 @@ uint32_t ip2uint(const char *const ip_addr);
void
taosSetAllocMode
(
int
mode
,
const
char
*
path
,
bool
autoDump
);
void
taosSetAllocMode
(
int
mode
,
const
char
*
path
,
bool
autoDump
);
void
taosDumpMemoryLeak
();
void
taosDumpMemoryLeak
();
#define TD_EQ 0x1
#define TD_GT 0x2
#define TD_LT 0x4
#define TD_GE (TD_EQ | TD_GT)
#define TD_LE (TD_EQ | TD_LT)
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
size_t
nmemb
,
size_t
size
,
int
(
*
compar
)(
const
void
*
,
const
void
*
),
int
flags
);
#ifdef TAOS_MEM_CHECK
#ifdef TAOS_MEM_CHECK
void
*
taos_malloc
(
size_t
size
,
const
char
*
file
,
uint32_t
line
);
void
*
taos_malloc
(
size_t
size
,
const
char
*
file
,
uint32_t
line
);
...
...
src/util/src/tutil.c
浏览文件 @
240a201b
...
@@ -617,3 +617,72 @@ char *taosCharsetReplace(char *charsetstr) {
...
@@ -617,3 +617,72 @@ char *taosCharsetReplace(char *charsetstr) {
return
strdup
(
charsetstr
);
return
strdup
(
charsetstr
);
}
}
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
size_t
nmemb
,
size_t
size
,
int
(
*
compar
)(
const
void
*
,
const
void
*
),
int
flags
)
{
// TODO: need to check the correctness of this function
int
l
=
0
;
int
r
=
nmemb
;
int
idx
=
0
;
int
comparison
;
if
(
flags
==
TD_EQ
)
{
return
bsearch
(
key
,
base
,
nmemb
,
size
,
compar
);
}
else
if
(
flags
==
TD_GE
)
{
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
0
))
<=
0
)
return
elePtrAt
(
base
,
size
,
0
);
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
nmemb
-
1
))
>
0
)
return
NULL
;
while
(
l
<
r
)
{
idx
=
(
l
+
r
)
/
2
;
comparison
=
(
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
));
if
(
comparison
<
0
)
{
r
=
idx
;
}
else
if
(
comparison
>
0
)
{
l
=
idx
+
1
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
);
}
}
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
)
<
0
))
{
return
elePtrAt
(
base
,
size
,
idx
);
}
else
{
if
(
idx
+
1
>
nmemb
-
1
)
{
return
NULL
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
+
1
);
}
}
}
else
if
(
flags
==
TD_LE
)
{
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
nmemb
-
1
))
>=
0
)
return
elePtrAt
(
base
,
size
,
nmemb
-
1
);
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
0
))
<
0
)
return
NULL
;
while
(
l
<
r
)
{
idx
=
(
l
+
r
)
/
2
;
comparison
=
(
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
));
if
(
comparison
<
0
)
{
r
=
idx
;
}
else
if
(
comparison
>
0
)
{
l
=
idx
+
1
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
);
}
}
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
))
>
0
)
{
return
elePtrAt
(
base
,
size
,
idx
);
}
else
{
if
(
idx
==
0
)
{
return
NULL
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
-
1
);
}
}
}
else
{
assert
(
0
);
return
NULL
;
}
return
NULL
;
}
src/vnode/tsdb/inc/tsdbFile.h
浏览文件 @
240a201b
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
#include <stdint.h>
#include <stdint.h>
#include "dataformat.h"
#include "taosdef.h"
#include "taosdef.h"
#include "tglobalcfg.h"
#include "tglobalcfg.h"
...
@@ -69,19 +70,25 @@ typedef struct {
...
@@ -69,19 +70,25 @@ typedef struct {
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
);
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
);
void
tsdbCloseFileH
(
STsdbFileH
*
pFileH
);
void
tsdbCloseFileH
(
STsdbFileH
*
pFileH
);
int
tsdbCreateFGroup
(
STsdbFileH
*
pFileH
,
char
*
dataDir
,
int
fid
,
int
maxTables
);
int
tsdbCreateFGroup
(
STsdbFileH
*
pFileH
,
char
*
dataDir
,
int
fid
,
int
maxTables
);
int
tsdbOpenFile
(
SFile
*
pFile
,
int
oflag
);
SFileGroup
*
tsdbOpenFilesForCommit
(
STsdbFileH
*
pFileH
,
int
fid
);
int
tsdbRemoveFileGroup
(
STsdbFileH
*
pFile
,
int
fid
);
int
tsdbRemoveFileGroup
(
STsdbFileH
*
pFile
,
int
fid
);
typedef
struct
{
typedef
struct
{
int32_t
len
;
int32_t
len
;
int32_t
padding
;
// For padding purpose
int32_t
offset
;
int64_t
offset
;
int32_t
hasLast
:
1
;
}
SCompIdx
;
int32_t
numOfSuperBlocks
:
31
;
int32_t
checksum
;
TSKEY
maxKey
;
}
SCompIdx
;
/* sizeof(SCompIdx) = 24 */
/**
/**
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
* if numOfSubBlocks == 0, then the SCompBlock is a sub-block
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
* if numOfSubBlocks >= 1, then the SCompBlock is a super-block
* - if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
* the data block offset and length
* the data block offset and length
* if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
*
-
if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
* binary
* binary
*/
*/
typedef
struct
{
typedef
struct
{
...
@@ -101,11 +108,12 @@ typedef struct {
...
@@ -101,11 +108,12 @@ typedef struct {
int32_t
delimiter
;
// For recovery usage
int32_t
delimiter
;
// For recovery usage
int32_t
checksum
;
// TODO: decide if checksum logic in this file or make it one API
int32_t
checksum
;
// TODO: decide if checksum logic in this file or make it one API
int64_t
uid
;
int64_t
uid
;
int32_t
padding
;
// For padding purpose
int32_t
numOfBlocks
;
// TODO: make the struct padding
SCompBlock
blocks
[];
SCompBlock
blocks
[];
}
SCompInfo
;
}
SCompInfo
;
int
tsdbLoadCompIdx
(
SFileGroup
*
pGroup
,
void
*
buf
,
int
maxTables
);
int
tsdbLoadCompBlocks
(
SFileGroup
*
pGroup
,
SCompIdx
*
pIdx
,
void
*
buf
);
// TODO: take pre-calculation into account
// TODO: take pre-calculation into account
typedef
struct
{
typedef
struct
{
int16_t
colId
;
// Column ID
int16_t
colId
;
// Column ID
...
@@ -122,6 +130,8 @@ typedef struct {
...
@@ -122,6 +130,8 @@ typedef struct {
SCompCol
cols
[];
SCompCol
cols
[];
}
SCompData
;
}
SCompData
;
int
tsdbWriteBlockToFile
(
SFileGroup
*
pGroup
,
SCompInfo
*
pCompInfo
,
SCompIdx
*
pIdx
,
int
isMerge
,
SCompBlock
*
pBlock
,
SDataCols
*
pCols
);
void
tsdbGetKeyRangeOfFileId
(
int32_t
daysPerFile
,
int8_t
precision
,
int32_t
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
void
tsdbGetKeyRangeOfFileId
(
int32_t
daysPerFile
,
int8_t
precision
,
int32_t
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/vnode/tsdb/src/tsdbFile.c
浏览文件 @
240a201b
...
@@ -39,6 +39,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname);
...
@@ -39,6 +39,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname);
static
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
int
maxTables
,
SFile
*
pFile
);
static
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
int
maxTables
,
SFile
*
pFile
);
static
int
tsdbWriteFileHead
(
SFile
*
pFile
);
static
int
tsdbWriteFileHead
(
SFile
*
pFile
);
static
int
tsdbWriteHeadFileIdx
(
SFile
*
pFile
,
int
maxTables
);
static
int
tsdbWriteHeadFileIdx
(
SFile
*
pFile
,
int
maxTables
);
static
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
);
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
)
{
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
)
{
STsdbFileH
*
pFileH
=
(
STsdbFileH
*
)
calloc
(
1
,
sizeof
(
STsdbFileH
)
+
sizeof
(
SFileGroup
)
*
maxFiles
);
STsdbFileH
*
pFileH
=
(
STsdbFileH
*
)
calloc
(
1
,
sizeof
(
STsdbFileH
)
+
sizeof
(
SFileGroup
)
*
maxFiles
);
...
@@ -70,9 +71,7 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables)
...
@@ -70,9 +71,7 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables)
SFileGroup
fGroup
;
SFileGroup
fGroup
;
SFileGroup
*
pFGroup
=
&
fGroup
;
SFileGroup
*
pFGroup
=
&
fGroup
;
if
(
fid
<
TSDB_MIN_FILE_ID
(
pFileH
)
||
fid
>
TSDB_MAX_FILE_ID
(
pFileH
)
||
if
(
tsdbSearchFGroup
(
pFileH
,
fid
)
==
NULL
)
{
bsearch
((
void
*
)
&
fid
,
(
void
*
)(
pFileH
->
fGroup
),
pFileH
->
numOfFGroups
,
sizeof
(
SFileGroup
),
compFGroupKey
)
==
NULL
)
{
pFGroup
->
fileId
=
fid
;
pFGroup
->
fileId
=
fid
;
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbCreateFile
(
dataDir
,
fid
,
type
,
maxTables
,
&
(
pFGroup
->
files
[
type
]))
<
0
)
{
if
(
tsdbCreateFile
(
dataDir
,
fid
,
type
,
maxTables
,
&
(
pFGroup
->
files
[
type
]))
<
0
)
{
...
@@ -107,6 +106,86 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
...
@@ -107,6 +106,86 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
return
0
;
return
0
;
}
}
int
tsdbLoadCompIdx
(
SFileGroup
*
pGroup
,
void
*
buf
,
int
maxTables
)
{
SFile
*
pFile
=
&
(
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
]);
if
(
lseek
(
pFile
->
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
return
-
1
;
if
(
read
(
pFile
->
fd
,
buf
,
sizeof
(
SCompIdx
)
*
maxTables
)
<
0
)
return
-
1
;
// TODO: need to check the correctness
return
0
;
}
int
tsdbLoadCompBlocks
(
SFileGroup
*
pGroup
,
SCompIdx
*
pIdx
,
void
*
buf
)
{
SFile
*
pFile
=
&
(
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
]);
if
(
lseek
(
pFile
->
fd
,
pIdx
->
offset
,
SEEK_SET
)
<
0
)
return
-
1
;
if
(
read
(
pFile
->
fd
,
buf
,
pIdx
->
len
)
<
0
)
return
-
1
;
// TODO: need to check the correctness
return
0
;
}
static
int
tsdbWriteBlockToFileImpl
(
SFile
*
pFile
,
// File to write
SDataCols
*
pCols
,
// Data column buffer
int
numOfPointsToWrie
,
// Number of points to write to the file
SCompBlock
*
pBlock
// SCompBlock to hold block information to return
)
{
// pBlock->last = 0;
// pBlock->offset = lseek(pFile->fd, 0, SEEK_END);
// // pBlock->algorithm = ;
// pBlock->numOfPoints = pCols->numOfPoints;
// // pBlock->sversion = ;
// // pBlock->len = ;
// pBlock->numOfSubBlocks = 1;
// pBlock->keyFirst = dataColsKeyFirst(pCols);
// pBlock->keyLast = dataColsKeyLast(pCols);
// for (int i = 0; i < pCols->numOfCols; i++) {
// // TODO: if all col value is NULL, do not save it
// pBlock->numOfCols++;
// pCompData->numOfCols++;
// SCompCol *pCompCol = pCompData->cols + i;
// pCompCol->colId = pCols->cols[i].colId;
// pCompCol->type = pCols->cols[i].type;
// // pCompCol->len = ;
// // pCompCol->offset = ;
// }
return
0
;
}
int
tsdbWriteBlockToFile
(
SFileGroup
*
pGroup
,
SCompInfo
*
pCompInfo
,
SCompIdx
*
pIdx
,
int
isMerge
,
SCompBlock
*
pBlock
,
SDataCols
*
pCols
)
{
memset
((
void
*
)
pBlock
,
0
,
sizeof
(
SCompBlock
));
SFile
*
pFile
=
NULL
;
SCompData
*
pCompData
=
(
SCompData
*
)
malloc
(
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pCols
->
numOfCols
);
if
(
pCompData
==
NULL
)
return
-
1
;
pCompData
->
delimiter
=
TSDB_FILE_DELIMITER
;
// pCompData->uid = ;
if
(
isMerge
)
{
TSKEY
keyFirst
=
dataColsKeyFirst
(
pCols
);
// 1. Binary search the block the data can merged into
if
(
1
/* the data should only merged into last file */
)
{
}
else
{
}
}
else
{
// Write directly to the file without merge
if
(
1
/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/
)
{
// TODO: write the data to the last file
}
else
{
// TODO: wirte the data to the data file
}
}
// TODO: need to update pIdx
if
(
pCompData
)
free
(
pCompData
);
return
0
;
}
static
int
compFGroupKey
(
const
void
*
key
,
const
void
*
fgroup
)
{
static
int
compFGroupKey
(
const
void
*
key
,
const
void
*
fgroup
)
{
int
fid
=
*
(
int
*
)
key
;
int
fid
=
*
(
int
*
)
key
;
SFileGroup
*
pFGroup
=
(
SFileGroup
*
)
fgroup
;
SFileGroup
*
pFGroup
=
(
SFileGroup
*
)
fgroup
;
...
@@ -158,7 +237,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname)
...
@@ -158,7 +237,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname)
return
0
;
return
0
;
}
}
static
int
tsdbOpenFileForWrit
e
(
SFile
*
pFile
,
int
oflag
)
{
// TODO: change the function
int
tsdbOpenFil
e
(
SFile
*
pFile
,
int
oflag
)
{
// TODO: change the function
if
(
TSDB_IS_FILE_OPENED
(
pFile
))
return
-
1
;
if
(
TSDB_IS_FILE_OPENED
(
pFile
))
return
-
1
;
pFile
->
fd
=
open
(
pFile
->
fname
,
oflag
,
0755
);
pFile
->
fd
=
open
(
pFile
->
fname
,
oflag
,
0755
);
...
@@ -167,6 +246,16 @@ static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the f
...
@@ -167,6 +246,16 @@ static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the f
return
0
;
return
0
;
}
}
SFileGroup
*
tsdbOpenFilesForCommit
(
STsdbFileH
*
pFileH
,
int
fid
)
{
SFileGroup
*
pGroup
=
tsdbSearchFGroup
(
pFileH
,
fid
);
if
(
pGroup
==
NULL
)
return
NULL
;
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
tsdbOpenFile
(
&
(
pGroup
->
files
[
type
]),
O_RDWR
);
}
return
pGroup
;
}
static
int
tsdbCloseFile
(
SFile
*
pFile
)
{
static
int
tsdbCloseFile
(
SFile
*
pFile
)
{
if
(
!
TSDB_IS_FILE_OPENED
(
pFile
))
return
-
1
;
if
(
!
TSDB_IS_FILE_OPENED
(
pFile
))
return
-
1
;
int
ret
=
close
(
pFile
->
fd
);
int
ret
=
close
(
pFile
->
fd
);
...
@@ -186,7 +275,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
...
@@ -186,7 +275,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
return
-
1
;
return
-
1
;
}
}
if
(
tsdbOpenFile
ForWrite
(
pFile
,
O_WRONLY
|
O_CREAT
)
<
0
)
{
if
(
tsdbOpenFile
(
pFile
,
O_WRONLY
|
O_CREAT
)
<
0
)
{
// TODO: deal with the ERROR here
// TODO: deal with the ERROR here
return
-
1
;
return
-
1
;
}
}
...
@@ -213,3 +302,11 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file
...
@@ -213,3 +302,11 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file
*
minKey
=
fileId
*
daysPerFile
*
tsMsPerDay
[
precision
];
*
minKey
=
fileId
*
daysPerFile
*
tsMsPerDay
[
precision
];
*
maxKey
=
*
minKey
+
daysPerFile
*
tsMsPerDay
[
precision
]
-
1
;
*
maxKey
=
*
minKey
+
daysPerFile
*
tsMsPerDay
[
precision
]
-
1
;
}
}
static
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
)
{
if
(
pFileH
->
numOfFGroups
==
0
||
fid
<
pFileH
->
fGroup
[
0
].
fileId
||
fid
>
pFileH
->
fGroup
[
pFileH
->
numOfFGroups
-
1
].
fileId
)
return
NULL
;
void
*
ptr
=
bsearch
((
void
*
)
&
fid
,
(
void
*
)(
pFileH
->
fGroup
),
pFileH
->
numOfFGroups
,
sizeof
(
SFileGroup
),
compFGroupKey
);
if
(
ptr
==
NULL
)
return
NULL
;
return
(
SFileGroup
*
)
ptr
;
}
\ No newline at end of file
src/vnode/tsdb/src/tsdbMain.c
浏览文件 @
240a201b
...
@@ -764,6 +764,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
...
@@ -764,6 +764,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
static
int
tsdbReadRowsFromCache
(
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
)
{
static
int
tsdbReadRowsFromCache
(
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
)
{
int
numOfRows
=
0
;
int
numOfRows
=
0
;
do
{
do
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
break
;
if
(
node
==
NULL
)
break
;
...
@@ -776,6 +777,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
...
@@ -776,6 +777,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
numOfRows
++
;
numOfRows
++
;
if
(
numOfRows
>
maxRowsToRead
)
break
;
if
(
numOfRows
>
maxRowsToRead
)
break
;
}
while
(
tSkipListIterNext
(
pIter
));
}
while
(
tSkipListIterNext
(
pIter
));
return
numOfRows
;
return
numOfRows
;
}
}
...
@@ -865,24 +867,122 @@ static void *tsdbCommitData(void *arg) {
...
@@ -865,24 +867,122 @@ static void *tsdbCommitData(void *arg) {
}
}
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SSkipListIterator
**
iters
,
SDataCols
*
pCols
)
{
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SSkipListIterator
**
iters
,
SDataCols
*
pCols
)
{
int
flag
=
0
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
SFile
tFile
,
lFile
;
SFileGroup
*
pGroup
=
NULL
;
SCompIdx
*
pIndices
=
NULL
;
SCompInfo
*
pCompInfo
=
NULL
;
size_t
compInfoSize
=
0
;
SCompBlock
compBlock
;
SCompBlock
*
pBlock
=
&
compBlock
;
TSKEY
minKey
=
0
,
maxKey
=
0
;
TSKEY
minKey
=
0
,
maxKey
=
0
;
tsdbGetKeyRangeOfFileId
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
tsdbGetKeyRangeOfFileId
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
for
(
int
tid
=
0
;
tid
<
pCfg
->
maxTables
;
tid
++
)
{
for
(
int
tid
=
0
;
tid
<
pCfg
->
maxTables
;
tid
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
tid
];
STable
*
pTable
=
pMeta
->
tables
[
tid
];
SSkipListIterator
*
pIter
=
iters
[
tid
];
SSkipListIterator
*
pIter
=
iters
[
tid
];
int
isLoadCompBlocks
=
0
;
if
(
pIter
==
NULL
)
continue
;
if
(
pIter
==
NULL
)
continue
;
tdInitDataCols
(
pCols
,
pTable
->
schema
);
tdInitDataCols
(
pCols
,
pTable
->
schema
);
while
(
tsdbReadRowsFromCache
(
pIter
,
maxKey
,
pCfg
->
maxRowsPerFileBlock
,
pCols
))
{
int
numOfWrites
=
0
;
// while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) {
// break;
// if (!flag) {
// // There are data to commit to this file, we need to create/open it for read/write.
// // At the meantime, we set the flag to prevent further create/open operations
// if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
// // Open files for commit
// pGroup = tsdbOpenFilesForCommit(pFileH, fid);
// if (pGroup == NULL) {
// // TODO: deal with the ERROR here
// }
// // TODO: open .h file and if neccessary, open .l file
// {}
// pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
// if (pIndices == NULL) {
// // TODO: deal with the ERROR
// }
// // load the SCompIdx part
// if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
// // TODO: sendfile those not need changed table content
// for (int ttid = 0; ttid < tid; ttid++) {
// // SCompIdx *pIdx = &pIndices[ttid];
// // if (pIdx->len > 0) {
// // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR);
// // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len);
// // }
// }
// flag = 1;
// }
// SCompIdx *pIdx = &pIndices[tid];
// /* The first time to write to the table, need to decide
// * if it is neccessary to load the SComplock part. If it
// * is needed, just load it, or, just use sendfile and
// * append it.
// */
// if (numOfWrites == 0 && pIdx->offset > 0) {
// if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
// if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
// // TODO: deal with the ERROR here
// }
// if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
// } else {
// // TODO: sendfile the prefix part
// }
// }
// // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) {
// // // TODO: deal with the ERROR here
// // }
// // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
// // if (1 /* the SCompBlock part is not loaded*/) {
// // // Append to .data file generate a SCompBlock and record it
// // } else {
// // }
// // // TODO: need to reset the pCols
// numOfWrites++;
// }
// if (pCols->numOfPoints > 0) {
// // TODO: still has data to commit, commit it
// }
// if (1/* SCompBlock part is loaded, write it to .head file*/) {
// // TODO
// } else {
// // TODO: use sendfile send the old part and append the newly added part
// }
}
// Write the SCompIdx part
// Close all files and return
if
(
flag
)
{
// TODO
// TODO
int
k
=
0
;
}
}
}
if
(
pIndices
)
free
(
pIndices
);
if
(
pCompInfo
)
free
(
pCompInfo
);
return
0
;
return
0
;
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录