Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
72d7f581
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
72d7f581
编写于
3月 18, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '2.0' into refact/slguan
上级
053280da
cb1602f7
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
290 addition
and
110 deletion
+290
-110
src/common/inc/dataformat.h
src/common/inc/dataformat.h
+5
-30
src/common/src/dataformat.c
src/common/src/dataformat.c
+52
-38
src/inc/taosdef.h
src/inc/taosdef.h
+11
-0
src/vnode/tsdb/inc/tsdbMeta.h
src/vnode/tsdb/inc/tsdbMeta.h
+1
-1
src/vnode/tsdb/inc/tsdbMetaFile.h
src/vnode/tsdb/inc/tsdbMetaFile.h
+12
-6
src/vnode/tsdb/src/tsdbMain.c
src/vnode/tsdb/src/tsdbMain.c
+12
-18
src/vnode/tsdb/src/tsdbMeta.c
src/vnode/tsdb/src/tsdbMeta.c
+93
-11
src/vnode/tsdb/src/tsdbMetaFile.c
src/vnode/tsdb/src/tsdbMetaFile.c
+62
-6
src/vnode/tsdb/tests/tsdbTests.cpp
src/vnode/tsdb/tests/tsdbTests.cpp
+42
-0
未找到文件。
src/common/inc/dataformat.h
浏览文件 @
72d7f581
...
...
@@ -51,19 +51,21 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes);
// ----------------- TSDB SCHEMA DEFINITION
typedef
struct
{
int
numOfCols
;
// Number of columns appended
int
totalCols
;
// Total columns allocated
int
padding
;
// Total columns allocated
STColumn
columns
[];
}
STSchema
;
#define schemaNCols(s) ((s)->numOfCols)
#define schemaTCols(s) ((s)->totalCols)
#define schemaColAt(s, i) ((s)->columns + i)
STSchema
*
tdNewSchema
(
int32_t
nCols
);
int
tdSchemaAppendCol
(
STSchema
*
pSchema
,
int8_t
type
,
int16_t
colId
,
int
16
_t
bytes
);
int
tdSchemaAppendCol
(
STSchema
*
pSchema
,
int8_t
type
,
int16_t
colId
,
int
32
_t
bytes
);
STSchema
*
tdDupSchema
(
STSchema
*
pSchema
);
void
tdFreeSchema
(
STSchema
*
pSchema
);
void
tdUpdateSchema
(
STSchema
*
pSchema
);
int
tdGetSchemaEncodeSize
(
STSchema
*
pSchema
);
void
*
tdEncodeSchema
(
void
*
dst
,
STSchema
*
pSchema
);
STSchema
*
tdDecodeSchema
(
void
**
psrc
);
// ----------------- Data row structure
...
...
@@ -99,33 +101,6 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol);
void
tdDataRowReset
(
SDataRow
row
,
STSchema
*
pSchema
);
SDataRow
tdDataRowDup
(
SDataRow
row
);
/* Data rows definition, the format of it is like below:
* +---------+-----------------------+--------+-----------------------+
* | int32_t | | | |
* +---------+-----------------------+--------+-----------------------+
* | len | SDataRow | .... | SDataRow |
* +---------+-----------------------+--------+-----------------------+
*/
typedef
void
*
SDataRows
;
#define TD_DATA_ROWS_HEAD_LEN sizeof(int32_t)
#define dataRowsLen(rs) (*(int32_t *)(rs))
#define dataRowsSetLen(rs, l) (dataRowsLen(rs) = (l))
#define dataRowsInit(rs) dataRowsSetLen(rs, sizeof(int32_t))
void
tdDataRowsAppendRow
(
SDataRows
rows
,
SDataRow
row
);
// Data rows iterator
typedef
struct
{
int32_t
totalLen
;
int32_t
len
;
SDataRow
row
;
}
SDataRowsIter
;
void
tdInitSDataRowsIter
(
SDataRows
rows
,
SDataRowsIter
*
pIter
);
SDataRow
tdDataRowsNext
(
SDataRowsIter
*
pIter
);
/* Data column definition
* +---------+---------+-----------------------+
* | int32_t | int32_t | |
...
...
src/common/src/dataformat.c
浏览文件 @
72d7f581
...
...
@@ -91,10 +91,9 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) {
STSchema
*
tdNewSchema
(
int32_t
nCols
)
{
int32_t
size
=
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
nCols
;
STSchema
*
pSchema
=
(
STSchema
*
)
malloc
(
size
);
STSchema
*
pSchema
=
(
STSchema
*
)
calloc
(
1
,
size
);
if
(
pSchema
==
NULL
)
return
NULL
;
pSchema
->
numOfCols
=
0
;
pSchema
->
totalCols
=
nCols
;
return
pSchema
;
}
...
...
@@ -102,8 +101,8 @@ STSchema *tdNewSchema(int32_t nCols) {
/**
* Append a column to the schema
*/
int
tdSchemaAppendCol
(
STSchema
*
pSchema
,
int8_t
type
,
int16_t
colId
,
int
16
_t
bytes
)
{
if
(
pSchema
->
numOfCols
>=
pSchema
->
totalCols
)
return
-
1
;
int
tdSchemaAppendCol
(
STSchema
*
pSchema
,
int8_t
type
,
int16_t
colId
,
int
32
_t
bytes
)
{
//
if (pSchema->numOfCols >= pSchema->totalCols) return -1;
if
(
!
isValidDataType
(
type
,
0
))
return
-
1
;
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
schemaNCols
(
pSchema
));
...
...
@@ -159,6 +158,53 @@ void tdUpdateSchema(STSchema *pSchema) {
}
}
/**
* Return the size of encoded schema
*/
int
tdGetSchemaEncodeSize
(
STSchema
*
pSchema
)
{
return
sizeof
(
STSchema
)
+
schemaNCols
(
pSchema
)
*
(
T_MEMBER_SIZE
(
STColumn
,
type
)
+
T_MEMBER_SIZE
(
STColumn
,
colId
)
+
T_MEMBER_SIZE
(
STColumn
,
bytes
));
}
/**
* Encode a schema to dst, and return the next pointer
*/
void
*
tdEncodeSchema
(
void
*
dst
,
STSchema
*
pSchema
)
{
T_APPEND_MEMBER
(
dst
,
pSchema
,
STSchema
,
numOfCols
);
for
(
int
i
=
0
;
i
<
schemaNCols
(
pSchema
);
i
++
)
{
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
i
);
T_APPEND_MEMBER
(
dst
,
pCol
,
STColumn
,
type
);
T_APPEND_MEMBER
(
dst
,
pCol
,
STColumn
,
colId
);
T_APPEND_MEMBER
(
dst
,
pCol
,
STColumn
,
bytes
);
}
return
dst
;
}
/**
* Decode a schema from a binary.
*/
STSchema
*
tdDecodeSchema
(
void
**
psrc
)
{
int
numOfCols
=
0
;
T_READ_MEMBER
(
*
psrc
,
int
,
numOfCols
);
STSchema
*
pSchema
=
tdNewSchema
(
numOfCols
);
if
(
pSchema
==
NULL
)
return
NULL
;
for
(
int
i
=
0
;
i
<
numOfCols
;
i
++
)
{
int8_t
type
=
0
;
int16_t
colId
=
0
;
int32_t
bytes
=
0
;
T_READ_MEMBER
(
*
psrc
,
int8_t
,
type
);
T_READ_MEMBER
(
*
psrc
,
int16_t
,
colId
);
T_READ_MEMBER
(
*
psrc
,
int32_t
,
bytes
);
tdSchemaAppendCol
(
pSchema
,
type
,
colId
,
bytes
);
}
return
pSchema
;
}
/**
* Initialize a data row
*/
...
...
@@ -234,6 +280,8 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) {
dataRowFLen
(
row
)
+=
TYPE_BYTES
[
colType
(
pCol
)];
break
;
}
return
0
;
}
void
tdDataRowReset
(
SDataRow
row
,
STSchema
*
pSchema
)
{
tdInitDataRow
(
row
,
pSchema
);
}
...
...
@@ -246,40 +294,6 @@ SDataRow tdDataRowDup(SDataRow row) {
return
trow
;
}
void
tdDataRowsAppendRow
(
SDataRows
rows
,
SDataRow
row
)
{
dataRowCpy
((
void
*
)((
char
*
)
rows
+
dataRowsLen
(
rows
)),
row
);
dataRowsSetLen
(
rows
,
dataRowsLen
(
rows
)
+
dataRowLen
(
row
));
}
// Initialize the iterator
void
tdInitSDataRowsIter
(
SDataRows
rows
,
SDataRowsIter
*
pIter
)
{
if
(
pIter
==
NULL
)
return
;
pIter
->
totalLen
=
dataRowsLen
(
rows
);
if
(
pIter
->
totalLen
==
TD_DATA_ROWS_HEAD_LEN
)
{
pIter
->
row
=
NULL
;
return
;
}
pIter
->
row
=
(
SDataRow
)((
char
*
)
rows
+
TD_DATA_ROWS_HEAD_LEN
);
pIter
->
len
=
TD_DATA_ROWS_HEAD_LEN
+
dataRowLen
(
pIter
->
row
);
}
// Get the next row in Rows
SDataRow
tdDataRowsNext
(
SDataRowsIter
*
pIter
)
{
SDataRow
row
=
pIter
->
row
;
if
(
row
==
NULL
)
return
NULL
;
if
(
pIter
->
len
>=
pIter
->
totalLen
)
{
pIter
->
row
=
NULL
;
}
else
{
pIter
->
row
=
(
char
*
)
row
+
dataRowLen
(
row
);
pIter
->
len
+=
dataRowLen
(
row
);
}
return
row
;
}
/**
* Return the first part length of a data row for a schema
*/
...
...
src/inc/taosdef.h
浏览文件 @
72d7f581
...
...
@@ -82,6 +82,17 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_TIME_PRECISION_MILLI_STR "ms"
#define TSDB_TIME_PRECISION_MICRO_STR "us"
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
#define T_APPEND_MEMBER(dst, ptr, type, member) \
do {\
memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member));\
dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member));\
} while(0)
#define T_READ_MEMBER(src, type, target) \
do { \
(target) = *(type *)(src); \
(src) = (void *)((char *)src + sizeof(type));\
} while(0)
#define TSDB_KEYSIZE sizeof(TSKEY)
...
...
src/vnode/tsdb/inc/tsdbMeta.h
浏览文件 @
72d7f581
...
...
@@ -35,7 +35,7 @@ extern "C" {
// ---------- TSDB TABLE DEFINITION
typedef
struct
STable
{
TSDB_TABLE_TYPE
type
;
int8_t
type
;
STableId
tableId
;
int32_t
superUid
;
// Super table UID
int32_t
sversion
;
...
...
src/vnode/tsdb/inc/tsdbMetaFile.h
浏览文件 @
72d7f581
...
...
@@ -25,15 +25,21 @@ extern "C" {
#define TSDB_META_FILE_NAME "META"
#define TSDB_META_HASH_FRACTION 1.1
typedef
int
(
*
iterFunc
)(
void
*
,
void
*
cont
,
int
contLen
);
typedef
void
(
*
afterFunc
)(
void
*
);
typedef
struct
{
int
fd
;
// File descriptor
int
nDel
;
// number of deletions
int
nRecord
;
// Number of records
int64_t
size
;
// Total file size
void
*
map
;
// Map from uid ==> position
int
fd
;
// File descriptor
int
nDel
;
// number of deletions
int
tombSize
;
// deleted size
int64_t
size
;
// Total file size
void
*
map
;
// Map from uid ==> position
iterFunc
iFunc
;
afterFunc
aFunc
;
void
*
appH
;
}
SMetaFile
;
SMetaFile
*
tsdbInitMetaFile
(
char
*
rootDir
,
int32_t
maxTables
);
SMetaFile
*
tsdbInitMetaFile
(
char
*
rootDir
,
int32_t
maxTables
,
iterFunc
iFunc
,
afterFunc
aFunc
,
void
*
appH
);
int32_t
tsdbInsertMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
,
void
*
cont
,
int32_t
contLen
);
int32_t
tsdbDeleteMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
);
int32_t
tsdbUpdateMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
,
void
*
cont
,
int32_t
contLen
);
...
...
src/vnode/tsdb/src/tsdbMain.c
浏览文件 @
72d7f581
...
...
@@ -77,8 +77,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static
int32_t
tsdbSetRepoEnv
(
STsdbRepo
*
pRepo
);
static
int32_t
tsdbDestroyRepoEnv
(
STsdbRepo
*
pRepo
);
static
int
tsdbOpenMetaFile
(
char
*
tsdbDir
);
static
int
tsdbRecoverRepo
(
int
fd
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbInsertDataToTable
(
tsdb_repo_t
*
repo
,
SSubmitBlk
*
pBlock
);
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...
...
@@ -219,25 +219,25 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
return
NULL
;
}
int
fd
=
tsdbOpenMetaFile
(
tsdbDir
);
if
(
fd
<
0
)
{
free
(
pRepo
);
return
NULL
;
}
pRepo
->
rootDir
=
strdup
(
tsdbDir
);
if
(
tsdbRecoverRepo
(
fd
,
&
(
pRepo
->
config
))
<
0
)
{
close
(
fd
);
tsdbRestoreCfg
(
pRepo
,
&
(
pRepo
->
config
));
pRepo
->
tsdbMeta
=
tsdbInitMeta
(
tsdbDir
,
pRepo
->
config
.
maxTables
);
if
(
pRepo
->
tsdbMeta
==
NULL
)
{
free
(
pRepo
->
rootDir
);
free
(
pRepo
);
return
NULL
;
}
pRepo
->
tsdbCache
=
tsdbInitCache
(
5
);
pRepo
->
tsdbCache
=
tsdbInitCache
(
pRepo
->
config
.
maxCacheSize
);
if
(
pRepo
->
tsdbCache
==
NULL
)
{
// TODO: deal with error
tsdbFreeMeta
(
pRepo
->
tsdbMeta
);
free
(
pRepo
->
rootDir
);
free
(
pRepo
);
return
NULL
;
}
pRepo
->
rootDir
=
strdup
(
tsdbDir
);
pRepo
->
state
=
TSDB_REPO_STATE_ACTIVE
;
return
(
tsdb_repo_t
*
)
pRepo
;
...
...
@@ -459,7 +459,7 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
if
(
pIter
->
len
>=
pIter
->
totalLen
)
{
pIter
->
pBlock
=
NULL
;
}
else
{
pIter
->
pBlock
=
(
char
*
)
pBlock
+
pBlock
->
len
+
sizeof
(
SSubmitBlk
);
pIter
->
pBlock
=
(
SSubmitBlk
*
)((
char
*
)
pBlock
+
pBlock
->
len
+
sizeof
(
SSubmitBlk
)
);
}
return
pBlock
;
...
...
@@ -623,12 +623,6 @@ static int tsdbOpenMetaFile(char *tsdbDir) {
return
0
;
}
static
int
tsdbRecoverRepo
(
int
fd
,
STsdbCfg
*
pCfg
)
{
// TODO: read tsdb configuration from file
// recover tsdb meta
return
0
;
}
static
int32_t
tdInsertRowToTable
(
STsdbRepo
*
pRepo
,
SDataRow
row
,
STable
*
pTable
)
{
// TODO
int32_t
level
=
0
;
...
...
src/vnode/tsdb/src/tsdbMeta.c
浏览文件 @
72d7f581
...
...
@@ -13,7 +13,7 @@
static
int
tsdbFreeTable
(
STable
*
pTable
);
static
int32_t
tsdbCheckTableCfg
(
STableCfg
*
pCfg
);
static
int
tsdbAddTableToMeta
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
static
int
tsdbAddTableToMeta
(
STsdbMeta
*
pMeta
,
STable
*
pTable
,
bool
addIdx
);
static
int
tsdbAddTableIntoMap
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
static
int
tsdbAddTableIntoIndex
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
static
int
tsdbRemoveTableFromIndex
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
...
...
@@ -39,8 +39,21 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) {
void
*
ret
=
malloc
(
*
contLen
);
if
(
ret
==
NULL
)
return
NULL
;
// TODO: encode the object to the memory
{}
void
*
ptr
=
ret
;
T_APPEND_MEMBER
(
ptr
,
pTable
,
STable
,
type
);
T_APPEND_MEMBER
(
ptr
,
&
(
pTable
->
tableId
),
STableId
,
uid
);
T_APPEND_MEMBER
(
ptr
,
&
(
pTable
->
tableId
),
STableId
,
tid
);
T_APPEND_MEMBER
(
ptr
,
pTable
,
STable
,
superUid
);
T_APPEND_MEMBER
(
ptr
,
pTable
,
STable
,
sversion
);
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
ptr
=
tdEncodeSchema
(
ptr
,
pTable
->
schema
);
ptr
=
tdEncodeSchema
(
ptr
,
pTable
->
tagSchema
);
}
else
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
dataRowCpy
(
ptr
,
pTable
->
tagVal
);
}
else
{
ptr
=
tdEncodeSchema
(
ptr
,
pTable
->
schema
);
}
return
ret
;
}
...
...
@@ -59,8 +72,20 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
STable
*
pTable
=
(
STable
*
)
calloc
(
1
,
sizeof
(
STable
));
if
(
pTable
==
NULL
)
return
NULL
;
{
// TODO recover from the binary content
void
*
ptr
=
cont
;
T_READ_MEMBER
(
ptr
,
int8_t
,
pTable
->
type
);
T_READ_MEMBER
(
ptr
,
int64_t
,
pTable
->
tableId
.
uid
);
T_READ_MEMBER
(
ptr
,
int32_t
,
pTable
->
tableId
.
tid
);
T_READ_MEMBER
(
ptr
,
int32_t
,
pTable
->
superUid
);
T_READ_MEMBER
(
ptr
,
int32_t
,
pTable
->
sversion
);
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
pTable
->
schema
=
tdDecodeSchema
(
&
ptr
);
pTable
->
tagSchema
=
tdDecodeSchema
(
&
ptr
);
}
else
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
pTable
->
tagVal
=
tdDataRowDup
(
ptr
);
}
else
{
pTable
->
schema
=
tdDecodeSchema
(
&
ptr
);
}
return
pTable
;
...
...
@@ -70,6 +95,36 @@ void *tsdbFreeEncode(void *cont) {
if
(
cont
!=
NULL
)
free
(
cont
);
}
int
tsdbRestoreTable
(
void
*
pHandle
,
void
*
cont
,
int
contLen
)
{
STsdbMeta
*
pMeta
=
(
STsdbMeta
*
)
pHandle
;
STable
*
pTable
=
tsdbDecodeTable
(
cont
,
contLen
);
if
(
pTable
==
NULL
)
return
-
1
;
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
pTable
->
content
.
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
sizeof
(
int64_t
),
1
,
0
,
getTupleKey
);
}
else
{
pTable
->
content
.
pData
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
TYPE_BYTES
[
TSDB_DATA_TYPE_TIMESTAMP
],
0
,
0
,
getTupleKey
);
}
tsdbAddTableToMeta
(
pMeta
,
pTable
,
false
);
return
0
;
}
void
tsdbOrgMeta
(
void
*
pHandle
)
{
STsdbMeta
*
pMeta
=
(
STsdbMeta
*
)
pHandle
;
for
(
int
i
=
0
;
i
<
pMeta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
!=
NULL
&&
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
tsdbAddTableIntoIndex
(
pMeta
,
pTable
);
}
}
}
/**
* Initialize the meta handle
* ASSUMPTIONS: VALID PARAMETER
...
...
@@ -94,7 +149,7 @@ STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) {
return
NULL
;
}
pMeta
->
mfh
=
tsdbInitMetaFile
(
rootDir
,
maxTables
);
pMeta
->
mfh
=
tsdbInitMetaFile
(
rootDir
,
maxTables
,
tsdbRestoreTable
,
tsdbOrgMeta
,
pMeta
);
if
(
pMeta
->
mfh
==
NULL
)
{
taosHashCleanup
(
pMeta
->
map
);
free
(
pMeta
->
tables
);
...
...
@@ -186,8 +241,21 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
}
table
->
content
.
pData
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
TYPE_BYTES
[
TSDB_DATA_TYPE_TIMESTAMP
],
0
,
0
,
getTupleKey
);
if
(
newSuper
)
tsdbAddTableToMeta
(
pMeta
,
super
);
tsdbAddTableToMeta
(
pMeta
,
table
);
// Register to meta
if
(
newSuper
)
tsdbAddTableToMeta
(
pMeta
,
super
,
true
);
tsdbAddTableToMeta
(
pMeta
,
table
,
true
);
// Write to meta file
int
bufLen
=
0
;
if
(
newSuper
)
{
void
*
buf
=
tsdbEncodeTable
(
super
,
&
bufLen
);
tsdbInsertMetaRecord
(
pMeta
->
mfh
,
super
->
tableId
.
uid
,
buf
,
bufLen
);
tsdbFreeEncode
(
buf
);
}
void
*
buf
=
tsdbEncodeTable
(
table
,
&
bufLen
);
tsdbInsertMetaRecord
(
pMeta
->
mfh
,
table
->
tableId
.
uid
,
buf
,
bufLen
);
tsdbFreeEncode
(
buf
);
return
0
;
}
...
...
@@ -268,7 +336,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
return
*
(
STable
**
)
ptr
;
}
static
int
tsdbAddTableToMeta
(
STsdbMeta
*
pMeta
,
STable
*
pTable
)
{
static
int
tsdbAddTableToMeta
(
STsdbMeta
*
pMeta
,
STable
*
pTable
,
bool
addIdx
)
{
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
// add super table to the linked list
if
(
pMeta
->
superList
==
NULL
)
{
...
...
@@ -318,8 +386,22 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
}
static
int
tsdbEstimateTableEncodeSize
(
STable
*
pTable
)
{
// TODO
return
0
;
int
size
=
0
;
size
+=
T_MEMBER_SIZE
(
STable
,
type
);
size
+=
T_MEMBER_SIZE
(
STable
,
tableId
);
size
+=
T_MEMBER_SIZE
(
STable
,
superUid
);
size
+=
T_MEMBER_SIZE
(
STable
,
sversion
);
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
size
+=
tdGetSchemaEncodeSize
(
pTable
->
schema
);
size
+=
tdGetSchemaEncodeSize
(
pTable
->
tagSchema
);
}
else
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
size
+=
dataRowLen
(
pTable
->
tagVal
);
}
else
{
size
+=
tdGetSchemaEncodeSize
(
pTable
->
schema
);
}
return
size
;
}
static
char
*
getTupleKey
(
const
void
*
data
)
{
...
...
src/vnode/tsdb/src/tsdbMetaFile.c
浏览文件 @
72d7f581
...
...
@@ -19,11 +19,14 @@
#include "hash.h"
#include "tsdbMetaFile.h"
#define TSDB_META_FILE_VERSION_MAJOR 1
#define TSDB_META_FILE_VERSION_MINOR 0
#define TSDB_META_FILE_HEADER_SIZE 512
typedef
struct
{
int32_t
offset
;
int32_t
size
;
int64_t
uid
;
}
SRecordInfo
;
static
int32_t
tsdbGetMetaFileName
(
char
*
rootDir
,
char
*
fname
);
...
...
@@ -32,14 +35,20 @@ static int32_t tsdbWriteMetaHeader(int fd);
static
int
tsdbCreateMetaFile
(
char
*
fname
);
static
int
tsdbRestoreFromMetaFile
(
char
*
fname
,
SMetaFile
*
mfh
);
SMetaFile
*
tsdbInitMetaFile
(
char
*
rootDir
,
int32_t
maxTables
)
{
// TODO
SMetaFile
*
tsdbInitMetaFile
(
char
*
rootDir
,
int32_t
maxTables
,
iterFunc
iFunc
,
afterFunc
aFunc
,
void
*
appH
)
{
char
fname
[
128
]
=
"
\0
"
;
if
(
tsdbGetMetaFileName
(
rootDir
,
fname
)
<
0
)
return
NULL
;
SMetaFile
*
mfh
=
(
SMetaFile
*
)
calloc
(
1
,
sizeof
(
SMetaFile
));
if
(
mfh
==
NULL
)
return
NULL
;
mfh
->
iFunc
=
iFunc
;
mfh
->
aFunc
=
aFunc
;
mfh
->
appH
=
appH
;
mfh
->
nDel
=
0
;
mfh
->
tombSize
=
0
;
mfh
->
size
=
0
;
// OPEN MAP
mfh
->
map
=
taosHashInit
(
maxTables
*
TSDB_META_HASH_FRACTION
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
...
...
@@ -56,6 +65,7 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables) {
free
(
mfh
);
return
NULL
;
}
mfh
->
size
+=
TSDB_META_FILE_HEADER_SIZE
;
}
else
{
// file exists, recover from file
if
(
tsdbRestoreFromMetaFile
(
fname
,
mfh
)
<
0
)
{
taosHashCleanup
(
mfh
->
map
);
...
...
@@ -74,7 +84,8 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
SRecordInfo
info
;
info
.
offset
=
mfh
->
size
;
info
.
size
=
contLen
;
// TODO: Here is not correct
info
.
size
=
contLen
;
info
.
uid
=
uid
;
mfh
->
size
+=
(
contLen
+
sizeof
(
SRecordInfo
));
...
...
@@ -83,7 +94,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
}
// TODO: make below a function to implement
if
(
lseek
(
mfh
->
fd
,
info
.
offset
,
SEEK_
CUR
)
<
0
)
{
if
(
lseek
(
mfh
->
fd
,
info
.
offset
,
SEEK_
SET
)
<
0
)
{
return
-
1
;
}
...
...
@@ -97,7 +108,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
fsync
(
mfh
->
fd
);
mfh
->
nRecord
++
;
mfh
->
tombSize
++
;
return
0
;
}
...
...
@@ -182,6 +193,15 @@ static int32_t tsdbCheckMetaHeader(int fd) {
static
int32_t
tsdbWriteMetaHeader
(
int
fd
)
{
// TODO: write the meta file header to file
char
head
[
TSDB_META_FILE_HEADER_SIZE
]
=
"
\0
"
;
sprintf
(
head
,
"version: %d.%d"
,
TSDB_META_FILE_VERSION_MAJOR
,
TSDB_META_FILE_VERSION_MINOR
);
write
(
fd
,
(
void
*
)
head
,
TSDB_META_FILE_HEADER_SIZE
);
return
0
;
}
static
int32_t
tsdbReadMetaHeader
(
int
fd
)
{
lseek
(
fd
,
TSDB_META_FILE_HEADER_SIZE
,
SEEK_SET
);
return
0
;
}
...
...
@@ -218,8 +238,44 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
return
-
1
;
}
mfh
->
size
+=
TSDB_META_FILE_HEADER_SIZE
;
mfh
->
fd
=
fd
;
// TODO: iterate to read the meta file to restore the meta data
void
*
buf
=
NULL
;
int
buf_size
=
0
;
SRecordInfo
info
;
while
(
1
)
{
if
(
read
(
mfh
->
fd
,
(
void
*
)(
&
info
),
sizeof
(
SRecordInfo
))
==
0
)
break
;
if
(
info
.
offset
<
0
)
{
mfh
->
size
+=
(
info
.
size
+
sizeof
(
SRecordInfo
));
mfh
->
tombSize
+=
(
info
.
size
+
sizeof
(
SRecordInfo
));
lseek
(
mfh
->
fd
,
info
.
size
,
SEEK_CUR
);
mfh
->
size
=
mfh
->
size
+
sizeof
(
SRecordInfo
)
+
info
.
size
;
mfh
->
tombSize
=
mfh
->
tombSize
+
sizeof
(
SRecordInfo
)
+
info
.
size
;
}
else
{
if
(
taosHashPut
(
mfh
->
map
,
(
char
*
)(
&
info
.
uid
),
sizeof
(
info
.
uid
),
(
void
*
)(
&
info
),
sizeof
(
SRecordInfo
))
<
0
)
{
if
(
buf
)
free
(
buf
);
return
-
1
;
}
buf
=
realloc
(
buf
,
info
.
size
);
if
(
buf
==
NULL
)
return
-
1
;
if
(
read
(
mfh
->
fd
,
buf
,
info
.
size
)
<
0
)
{
if
(
buf
)
free
(
buf
);
return
-
1
;
}
(
*
mfh
->
iFunc
)(
mfh
->
appH
,
buf
,
info
.
size
);
mfh
->
size
=
mfh
->
size
+
sizeof
(
SRecordInfo
)
+
info
.
size
;
}
}
(
*
mfh
->
aFunc
)(
mfh
->
appH
);
if
(
buf
)
free
(
buf
);
return
0
;
}
\ No newline at end of file
src/vnode/tsdb/tests/tsdbTests.cpp
浏览文件 @
72d7f581
...
...
@@ -3,6 +3,44 @@
#include "tsdb.h"
#include "dataformat.h"
#include "tsdbMeta.h"
TEST
(
TsdbTest
,
tableEncodeDecode
)
{
STable
*
pTable
=
(
STable
*
)
malloc
(
sizeof
(
STable
));
pTable
->
type
=
TSDB_NORMAL_TABLE
;
pTable
->
tableId
.
uid
=
987607499877672L
;
pTable
->
tableId
.
tid
=
0
;
pTable
->
superUid
=
-
1
;
pTable
->
sversion
=
0
;
pTable
->
tagSchema
=
NULL
;
pTable
->
tagVal
=
NULL
;
int
nCols
=
5
;
STSchema
*
schema
=
tdNewSchema
(
nCols
);
for
(
int
i
=
0
;
i
<
nCols
;
i
++
)
{
if
(
i
==
0
)
{
tdSchemaAppendCol
(
schema
,
TSDB_DATA_TYPE_TIMESTAMP
,
i
,
-
1
);
}
else
{
tdSchemaAppendCol
(
schema
,
TSDB_DATA_TYPE_INT
,
i
,
-
1
);
}
}
pTable
->
schema
=
schema
;
int
bufLen
=
0
;
void
*
buf
=
tsdbEncodeTable
(
pTable
,
&
bufLen
);
STable
*
tTable
=
tsdbDecodeTable
(
buf
,
bufLen
);
ASSERT_EQ
(
pTable
->
type
,
tTable
->
type
);
ASSERT_EQ
(
pTable
->
tableId
.
uid
,
tTable
->
tableId
.
uid
);
ASSERT_EQ
(
pTable
->
tableId
.
tid
,
tTable
->
tableId
.
tid
);
ASSERT_EQ
(
pTable
->
superUid
,
tTable
->
superUid
);
ASSERT_EQ
(
pTable
->
sversion
,
tTable
->
sversion
);
ASSERT_EQ
(
memcmp
(
pTable
->
schema
,
tTable
->
schema
,
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
nCols
),
0
);
ASSERT_EQ
(
tTable
->
content
.
pData
,
nullptr
);
}
TEST
(
TsdbTest
,
createRepo
)
{
STsdbCfg
config
;
...
...
@@ -65,3 +103,7 @@ TEST(TsdbTest, createRepo) {
int
k
=
0
;
}
TEST
(
TsdbTest
,
openRepo
)
{
tsdb_repo_t
*
pRepo
=
tsdbOpenRepo
(
"/home/ubuntu/work/ttest/vnode0"
);
ASSERT_NE
(
pRepo
,
nullptr
);
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录