Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
147239d5
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
147239d5
编写于
3月 14, 2020
作者:
S
slguan
提交者:
GitHub
3月 14, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1366 from taosdata/feature/2.0tsdb
Feature/2.0tsdb
上级
caee36aa
e85c8d51
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
994 addition
and
415 deletion
+994
-415
src/common/inc/dataformat.h
src/common/inc/dataformat.h
+24
-15
src/common/src/dataformat.c
src/common/src/dataformat.c
+97
-56
src/common/src/ttypes.c
src/common/src/ttypes.c
+2
-2
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+3
-3
src/vnode/tsdb/CMakeLists.txt
src/vnode/tsdb/CMakeLists.txt
+1
-1
src/vnode/tsdb/inc/tsdb.h
src/vnode/tsdb/inc/tsdb.h
+83
-104
src/vnode/tsdb/inc/tsdbCache.h
src/vnode/tsdb/inc/tsdbCache.h
+1
-1
src/vnode/tsdb/inc/tsdbFile.h
src/vnode/tsdb/inc/tsdbFile.h
+4
-2
src/vnode/tsdb/inc/tsdbMeta.h
src/vnode/tsdb/inc/tsdbMeta.h
+29
-52
src/vnode/tsdb/inc/tsdbMetaFile.h
src/vnode/tsdb/inc/tsdbMetaFile.h
+46
-0
src/vnode/tsdb/src/tsdbCache.c
src/vnode/tsdb/src/tsdbCache.c
+1
-1
src/vnode/tsdb/src/tsdbFile.c
src/vnode/tsdb/src/tsdbFile.c
+46
-0
src/vnode/tsdb/src/tsdbMain.c
src/vnode/tsdb/src/tsdbMain.c
+265
-42
src/vnode/tsdb/src/tsdbMeta.c
src/vnode/tsdb/src/tsdbMeta.c
+123
-63
src/vnode/tsdb/src/tsdbMetaFile.c
src/vnode/tsdb/src/tsdbMetaFile.c
+225
-0
src/vnode/tsdb/tests/tsdbTests.cpp
src/vnode/tsdb/tests/tsdbTests.cpp
+44
-73
未找到文件。
src/common/inc/dataformat.h
浏览文件 @
147239d5
...
...
@@ -51,44 +51,53 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes);
// ----------------- TSDB SCHEMA DEFINITION
typedef
struct
{
int
32_t
numOfCols
;
int
32_t
padding
;
// TODO: replace the padding for useful variable
int
numOfCols
;
// Number of columns appended
int
totalCols
;
// Total columns allocated
STColumn
columns
[];
}
STSchema
;
#define schemaNCols(s) ((s)->numOfCols)
#define schemaTCols(s) ((s)->totalCols)
#define schemaColAt(s, i) ((s)->columns + i)
STSchema
*
tdNewSchema
(
int32_t
nCols
);
int
tdSchemaAppendCol
(
STSchema
*
pSchema
,
int8_t
type
,
int16_t
colId
,
int16_t
bytes
);
STSchema
*
tdDupSchema
(
STSchema
*
pSchema
);
void
tdFreeSchema
(
STSchema
*
pSchema
);
void
tdUpdateSchema
(
STSchema
*
pSchema
);
void
tdFreeSchema
(
STSchema
*
pSchema
);
void
tdUpdateSchema
(
STSchema
*
pSchema
);
// ----------------- Data row structure
/* A data row, the format is like below:
* +---------+---------------------------------+
* | int32_t | |
* +---------+---------------------------------+
* | len | row |
* +---------+---------------------------------+
* +----------+---------+---------------------------------+---------------------------------+
* | int32_t | int32_t | | |
* +----------+---------+---------------------------------+---------------------------------+
* | len | flen | First part | Second part |
* +----------+---------+---------------------------------+---------------------------------+
* plen: first part length
* len: the length including sizeof(row) + sizeof(len)
* row: actual row data encoding
*/
typedef
void
*
SDataRow
;
#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
#define dataRowLen(r) (*(int32_t *)(r))
#define dataRowTuple(r) ((char *)(r) + sizeof(int32_t))
#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t)))
#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
#define dataRowIdx(r, i) ((char *)(r) + i)
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
#define dataRowAt(r, idx) ((char *)(r) + (idx))
SDataRow
tdNewDataRow
(
int32_t
bytes
);
// SDataRow tdNewDdataFromSchema(SSchema *pSchema);
void
tdInitDataRow
(
SDataRow
row
,
STSchema
*
pSchema
);
int
tdMaxRowBytesFromSchema
(
STSchema
*
pSchema
);
SDataRow
tdNewDataRow
(
int32_t
bytes
,
STSchema
*
pSchema
);
SDataRow
tdNewDataRowFromSchema
(
STSchema
*
pSchema
);
void
tdFreeDataRow
(
SDataRow
row
);
// int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset);
void
tdDataRowCpy
(
void
*
dst
,
SDataRow
row
);
void
tdDataRowReset
(
SDataRow
row
);
int
tdAppendColVal
(
SDataRow
row
,
void
*
value
,
STColumn
*
pCol
);
void
tdDataRowReset
(
SDataRow
row
,
STSchema
*
pSchema
);
SDataRow
tdDataRowDup
(
SDataRow
row
);
/* Data rows definition, the format of it is like below:
...
...
src/common/src/dataformat.c
浏览文件 @
147239d5
...
...
@@ -14,6 +14,8 @@
*/
#include "dataformat.h"
static
int
tdFLenFromSchema
(
STSchema
*
pSchema
);
/**
* Create a new STColumn object
* ASSUMPTIONS: VALID PARAMETERS
...
...
@@ -89,13 +91,40 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) {
STSchema
*
tdNewSchema
(
int32_t
nCols
)
{
int32_t
size
=
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
nCols
;
STSchema
*
pSchema
=
(
STSchema
*
)
calloc
(
1
,
size
);
STSchema
*
pSchema
=
(
STSchema
*
)
malloc
(
size
);
if
(
pSchema
==
NULL
)
return
NULL
;
pSchema
->
numOfCols
=
nCols
;
pSchema
->
numOfCols
=
0
;
pSchema
->
totalCols
=
nCols
;
return
pSchema
;
}
/**
* Append a column to the schema
*/
int
tdSchemaAppendCol
(
STSchema
*
pSchema
,
int8_t
type
,
int16_t
colId
,
int16_t
bytes
)
{
if
(
pSchema
->
numOfCols
>=
pSchema
->
totalCols
)
return
-
1
;
if
(
!
isValidDataType
(
type
,
0
))
return
-
1
;
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
schemaNCols
(
pSchema
));
colSetType
(
pCol
,
type
);
colSetColId
(
pCol
,
colId
);
colSetOffset
(
pCol
,
-
1
);
switch
(
type
)
{
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
colSetBytes
(
pCol
,
bytes
);
break
;
default:
colSetBytes
(
pCol
,
TYPE_BYTES
[
type
]);
break
;
}
pSchema
->
numOfCols
++
;
return
0
;
}
/**
* Duplicate the schema and return a new object
*/
...
...
@@ -130,6 +159,14 @@ void tdUpdateSchema(STSchema *pSchema) {
}
}
/**
* Initialize a data row
*/
void
tdInitDataRow
(
SDataRow
row
,
STSchema
*
pSchema
)
{
dataRowSetFLen
(
row
,
TD_DATA_ROW_HEAD_SIZE
);
dataRowSetLen
(
row
,
TD_DATA_ROW_HEAD_SIZE
+
tdFLenFromSchema
(
pSchema
));
}
/**
* Create a data row with maximum row length bytes.
*
...
...
@@ -140,21 +177,37 @@ void tdUpdateSchema(STSchema *pSchema) {
* @return SDataRow object for success
* NULL for failure
*/
SDataRow
tdNewDataRow
(
int32_t
bytes
)
{
SDataRow
tdNewDataRow
(
int32_t
bytes
,
STSchema
*
pSchema
)
{
int32_t
size
=
sizeof
(
int32_t
)
+
bytes
;
SDataRow
row
=
malloc
(
size
);
if
(
row
==
NULL
)
return
NULL
;
dataRowSetLen
(
row
,
sizeof
(
int32_t
)
);
tdInitDataRow
(
row
,
pSchema
);
return
row
;
}
// SDataRow tdNewDdataFromSchema(SSchema *pSchema) {
// int32_t bytes = tdMaxRowDataBytes(pSchema);
// return tdNewDataRow(bytes);
// }
/**
* Get maximum bytes a data row from a schema
* ASSUMPTIONS: VALID PARAMETER
*/
int
tdMaxRowBytesFromSchema
(
STSchema
*
pSchema
)
{
// TODO
int
bytes
=
TD_DATA_ROW_HEAD_SIZE
;
for
(
int
i
=
0
;
i
<
schemaNCols
(
pSchema
);
i
++
)
{
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
i
);
bytes
+=
TYPE_BYTES
[
pCol
->
type
];
if
(
pCol
->
type
==
TSDB_DATA_TYPE_BINARY
||
pCol
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
bytes
+=
pCol
->
bytes
;
}
}
return
bytes
;
}
SDataRow
tdNewDataRowFromSchema
(
STSchema
*
pSchema
)
{
return
tdNewDataRow
(
tdMaxRowBytesFromSchema
(
pSchema
),
pSchema
);
}
/**
* Free the SDataRow object
...
...
@@ -164,62 +217,37 @@ void tdFreeDataRow(SDataRow row) {
}
/**
* Append a column value to a SDataRow object.
* NOTE: THE APPLICATION SHOULD MAKE SURE VALID PARAMETERS. THE FUNCTION ASSUMES
* THE ROW OBJECT HAS ENOUGH SPACE TO HOLD THE VALUE.
*
* @param row the row to append value to
* @param value value pointer to append
* @param pSchema schema
* @param colIdx column index
*
* @return 0 for success and -1 for failure
*/
// int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset) {
// int32_t offset;
// switch (pCol->type) {
// case TD_DATATYPE_BOOL:
// case TD_DATATYPE_TINYINT:
// case TD_DATATYPE_SMALLINT:
// case TD_DATATYPE_INT:
// case TD_DATATYPE_BIGINT:
// case TD_DATATYPE_FLOAT:
// case TD_DATATYPE_DOUBLE:
// case TD_DATATYPE_TIMESTAMP:
// memcpy(dataRowIdx(row, pCol->offset + sizeof(int32_t)), value, rowDataLen[pCol->type]);
// if (dataRowLen(row) < suffixOffset + sizeof(int32_t))
// dataRowSetLen(row, dataRowLen(row) + rowDataLen[pCol->type]);
// break;
// case TD_DATATYPE_VARCHAR:
// offset = dataRowLen(row) > suffixOffset ? dataRowLen(row) : suffixOffset;
// memcpy(dataRowIdx(row, pCol->offset+sizeof(int32_t)), (void *)(&offset), sizeof(offset));
// case TD_DATATYPE_NCHAR:
// case TD_DATATYPE_BINARY:
// break;
// default:
// return -1;
// }
// return 0;
// }
/**
* Copy a data row to a destination
* ASSUMPTIONS: dst has enough room for a copy of row
* Append a column value to the data row
*/
void
tdDataRowCpy
(
void
*
dst
,
SDataRow
row
)
{
memcpy
(
dst
,
row
,
dataRowLen
(
row
));
}
void
tdDataRowReset
(
SDataRow
row
)
{
dataRowSetLen
(
row
,
sizeof
(
int32_t
));
}
int
tdAppendColVal
(
SDataRow
row
,
void
*
value
,
STColumn
*
pCol
)
{
switch
(
colType
(
pCol
))
{
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
*
(
int32_t
*
)
dataRowAt
(
row
,
dataRowFLen
(
row
))
=
dataRowLen
(
row
);
dataRowFLen
(
row
)
+=
TYPE_BYTES
[
colType
(
pCol
)];
memcpy
((
void
*
)
dataRowAt
(
row
,
dataRowLen
(
row
)),
value
,
strlen
(
value
));
dataRowLen
(
row
)
+=
strlen
(
value
);
break
;
default:
memcpy
(
dataRowAt
(
row
,
dataRowFLen
(
row
)),
value
,
TYPE_BYTES
[
colType
(
pCol
)]);
dataRowFLen
(
row
)
+=
TYPE_BYTES
[
colType
(
pCol
)];
break
;
}
}
void
tdDataRowReset
(
SDataRow
row
,
STSchema
*
pSchema
)
{
tdInitDataRow
(
row
,
pSchema
);
}
SDataRow
tdDataRowDup
(
SDataRow
row
)
{
SDataRow
trow
=
tdNewDataRow
(
dataRowLen
(
row
));
SDataRow
trow
=
malloc
(
dataRowLen
(
row
));
if
(
trow
==
NULL
)
return
NULL
;
dataRowCpy
(
trow
,
row
);
return
row
;
return
t
row
;
}
void
tdDataRowsAppendRow
(
SDataRows
rows
,
SDataRow
row
)
{
tdD
ataRowCpy
((
void
*
)((
char
*
)
rows
+
dataRowsLen
(
rows
)),
row
);
d
ataRowCpy
((
void
*
)((
char
*
)
rows
+
dataRowsLen
(
rows
)),
row
);
dataRowsSetLen
(
rows
,
dataRowsLen
(
rows
)
+
dataRowLen
(
row
));
}
...
...
@@ -250,4 +278,17 @@ SDataRow tdDataRowsNext(SDataRowsIter *pIter) {
}
return
row
;
}
/**
* Return the first part length of a data row for a schema
*/
static
int
tdFLenFromSchema
(
STSchema
*
pSchema
)
{
int
ret
=
0
;
for
(
int
i
=
0
;
i
<
schemaNCols
(
pSchema
);
i
++
)
{
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
i
);
ret
+=
TYPE_BYTES
[
pCol
->
type
];
}
return
ret
;
}
\ No newline at end of file
src/common/src/ttypes.c
浏览文件 @
147239d5
...
...
@@ -26,9 +26,9 @@ const int32_t TYPE_BYTES[11] = {
sizeof
(
int64_t
),
// TSDB_DATA_TYPE_BIGINT
sizeof
(
float
),
// TSDB_DATA_TYPE_FLOAT
sizeof
(
double
),
// TSDB_DATA_TYPE_DOUBLE
-
1
,
// TSDB_DATA_TYPE_BINARY
sizeof
(
int32_t
),
// TSDB_DATA_TYPE_BINARY
sizeof
(
TSKEY
),
// TSDB_DATA_TYPE_TIMESTAMP
-
1
// TSDB_DATA_TYPE_NCHAR
sizeof
(
int32_t
)
// TSDB_DATA_TYPE_NCHAR
};
tDataTypeDescriptor
tDataTypeDesc
[
11
]
=
{
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
147239d5
...
...
@@ -91,9 +91,9 @@ int32_t dnodeInitMgmt() {
SVnodeObj
*
pVnode
=
dnodeGetVnode
(
cfg
.
cfg
.
vgId
);
dnodeDropVnode
(
pVnode
);
dnodeCreateVnode
(
&
cfg
);
SVnodeObj
*
pVnode
=
dnodeGetVnode
(
cfg
.
cfg
.
vgId
);
dnodeCleanupVnodes
();
//
dnodeCreateVnode(&cfg);
//
SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
//
dnodeCleanupVnodes();
dnodeOpenVnodes
();
dnodeCleanupVnodes
();
...
...
src/vnode/tsdb/CMakeLists.txt
浏览文件 @
147239d5
...
...
@@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES
(
tsdb common tutil
)
# Someone has no gtest directory, so comment it
#
ADD_SUBDIRECTORY(tests)
ADD_SUBDIRECTORY
(
tests
)
ENDIF
()
src/vnode/tsdb/inc/tsdb.h
浏览文件 @
147239d5
...
...
@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if
!defined(_TD_TSDB_H_)
#if
ndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include <pthread.h>
...
...
@@ -30,43 +30,104 @@ extern "C" {
#define TSDB_VERSION_MAJOR 1
#define TSDB_VERSION_MINOR 0
#define TSDB_INVALID_SUPER_TABLE_ID -1
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
enum
{
TSDB_PRECISION_MILLI
,
TSDB_PRECISION_MICRO
,
TSDB_PRECISION_NANO
};
typedef
enum
{
TSDB_SUPER_TABLE
,
// super table
TSDB_NTABLE
,
// table not created from super table
TSDB_STABLE
// table created from super table
}
TSDB_TABLE_TYPE
;
typedef
struct
{
int8_t
precision
;
int32_t
vgId
;
int32_t
tsdbId
;
int32_t
maxTables
;
// maximum number of tables this repository can have
int32_t
daysPerFile
;
// day per file sharding policy
int32_t
minRowsPerFileBlock
;
// minimum rows per file block
int32_t
maxRowsPerFileBlock
;
// maximum rows per file block
int32_t
keep
;
// day of data to keep
int64_t
maxCacheSize
;
// maximum cache size this TSDB can use
}
STsdbCfg
;
void
tsdbSetDefaultCfg
(
STsdbCfg
*
pCfg
);
STsdbCfg
*
tsdbCreateDefaultCfg
();
void
tsdbFreeCfg
(
STsdbCfg
*
pCfg
);
// --------- TSDB REPOSITORY DEFINITION
typedef
void
tsdb_repo_t
;
// use void to hide implementation details from outside
tsdb_repo_t
*
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
,
void
*
limiter
);
int32_t
tsdbDropRepo
(
tsdb_repo_t
*
repo
);
tsdb_repo_t
*
tsdbOpenRepo
(
char
*
tsdbDir
);
int32_t
tsdbCloseRepo
(
tsdb_repo_t
*
repo
);
int32_t
tsdbConfigRepo
(
tsdb_repo_t
*
repo
,
STsdbCfg
*
pCfg
);
// --------- TSDB TABLE DEFINITION
typedef
struct
{
int64_t
uid
;
// the unique table ID
int32_t
tid
;
// the table ID in the repository.
}
STableId
;
//
Submit message for this TSDB
//
--------- TSDB TABLE configuration
typedef
struct
{
int32_t
numOfTables
;
int32_t
compressed
;
char
data
[];
}
SSubmitMsg
;
TSDB_TABLE_TYPE
type
;
STableId
tableId
;
int64_t
superUid
;
STSchema
*
schema
;
STSchema
*
tagSchema
;
SDataRow
tagValues
;
}
STableCfg
;
int
tsdbInitTableCfg
(
STableCfg
*
config
,
TSDB_TABLE_TYPE
type
,
int64_t
uid
,
int32_t
tid
);
int
tsdbTableSetSuperUid
(
STableCfg
*
config
,
int64_t
uid
);
int
tsdbTableSetSchema
(
STableCfg
*
config
,
STSchema
*
pSchema
,
bool
dup
);
int
tsdbTableSetTagSchema
(
STableCfg
*
config
,
STSchema
*
pSchema
,
bool
dup
);
int
tsdbTableSetTagValue
(
STableCfg
*
config
,
SDataRow
row
,
bool
dup
);
void
tsdbClearTableCfg
(
STableCfg
*
config
);
int
tsdbCreateTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
int
tsdbDropTable
(
tsdb_repo_t
*
pRepo
,
STableId
tableId
);
int
tsdbAlterTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
// Submit message for one table
typedef
struct
{
STableId
tableId
;
int32_t
padding
;
// TODO just for padding here
int32_t
sversion
;
// data schema version
int32_t
len
;
//
message length
int32_t
len
;
//
data part length, not including the SSubmitBlk head
char
data
[];
}
SSubmitBl
oc
k
;
}
SSubmitBlk
;
enum
{
TSDB_PRECISION_MILLI
,
TSDB_PRECISION_MICRO
,
TSDB_PRECISION_NANO
};
typedef
struct
{
int32_t
totalLen
;
int32_t
len
;
SDataRow
row
;
}
SSubmitBlkIter
;
// the TSDB repository configuration
int
tsdbInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
SDataRow
tsdbGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
// Submit message for this TSDB
typedef
struct
{
int8_t
precision
;
int32_t
vgId
;
int32_t
tsdbId
;
int32_t
maxTables
;
// maximum number of tables this repository can have
int32_t
daysPerFile
;
// day per file sharding policy
int32_t
minRowsPerFileBlock
;
// minimum rows per file block
int32_t
maxRowsPerFileBlock
;
// maximum rows per file block
int32_t
keep
;
// day of data to keep
int64_t
maxCacheSize
;
// maximum cache size this TSDB can use
}
STsdbCfg
;
int32_t
length
;
int32_t
compressed
;
SSubmitBlk
blocks
[];
}
SSubmitMsg
;
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
// SSubmitMsg Iterator
typedef
struct
{
int32_t
totalLen
;
int32_t
len
;
SSubmitBlk
*
pBlock
;
}
SSubmitMsgIter
;
int
tsdbInitSubmitMsgIter
(
SSubmitMsg
*
pMsg
,
SSubmitMsgIter
*
pIter
);
SSubmitBlk
*
tsdbGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
);
// the TSDB repository info
typedef
struct
STsdbRepoInfo
{
...
...
@@ -76,22 +137,7 @@ typedef struct STsdbRepoInfo {
int64_t
tsdbTotalDiskSize
;
// the total disk size taken by this TSDB repository
// TODO: Other informations to add
}
STsdbRepoInfo
;
// the meter configuration
typedef
struct
{
STableId
tableId
;
int64_t
stableUid
;
int64_t
createdTime
;
int32_t
numOfCols
;
// number of columns. For table form super table, not includes the tag schema
STSchema
*
schema
;
// If numOfCols == schema_->numOfCols, it is a normal table, stableName = NULL
// If numOfCols < schema->numOfCols, it is a table created from super table
// assert(numOfCols <= schema->numOfCols);
SDataRow
tagValues
;
// NULL if it is normal table
// otherwise, it contains the tag values.
}
STableCfg
;
STsdbRepoInfo
*
tsdbGetStatus
(
tsdb_repo_t
*
pRepo
);
// the meter information report structure
typedef
struct
{
...
...
@@ -100,70 +146,7 @@ typedef struct {
int64_t
tableTotalDataSize
;
// In bytes
int64_t
tableTotalDiskSize
;
// In bytes
}
STableInfo
;
/**
* Create a configuration for TSDB default
* @return a pointer to a configuration. the configuration must call tsdbFreeCfg to free memory after usage
*/
STsdbCfg
*
tsdbCreateDefaultCfg
();
/**
* Free
*/
void
tsdbFreeCfg
(
STsdbCfg
*
pCfg
);
/**
* Create a new TSDB repository
* @param rootDir the TSDB repository root directory
* @param pCfg the TSDB repository configuration, upper layer to free the pointer
*
* @return a TSDB repository handle on success, NULL for failure and the error number is set
*/
tsdb_repo_t
*
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
,
void
*
limiter
);
/**
* Close and free all resources taken by the repository
* @param repo the TSDB repository handle. The interface will free the handle too, so upper
* layer do NOT need to free the repo handle again.
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbDropRepo
(
tsdb_repo_t
*
repo
);
/**
* Open an existing TSDB storage repository
* @param tsdbDir the existing TSDB root directory
*
* @return a TSDB repository handle on success, NULL for failure and the error number is set
*/
tsdb_repo_t
*
tsdbOpenRepo
(
char
*
tsdbDir
);
/**
* Close a TSDB repository. Only free memory resources, and keep the files.
* @param repo the opened TSDB repository handle. The interface will free the handle too, so upper
* layer do NOT need to free the repo handle again.
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbCloseRepo
(
tsdb_repo_t
*
repo
);
/**
* Change the configuration of a repository
* @param pCfg the repository configuration, the upper layer should free the pointer
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbConfigRepo
(
tsdb_repo_t
*
repo
,
STsdbCfg
*
pCfg
);
/**
* Get the TSDB repository information, including some statistics
* @param pRepo the TSDB repository handle
* @param error the error number to set when failure occurs
*
* @return a info struct handle on success, NULL for failure and the error number is set. The upper
* layers should free the info handle themselves or memory leak will occur
*/
STsdbRepoInfo
*
tsdbGetStatus
(
tsdb_repo_t
*
pRepo
);
STableInfo
*
tsdbGetTableInfo
(
tsdb_repo_t
*
pRepo
,
STableId
tid
);
// -- For table manipulation
...
...
@@ -174,8 +157,6 @@ STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo);
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbCreateTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
int32_t
tsdbAlterTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
/**
* Drop a table in a repository and free all the resources it takes
...
...
@@ -185,7 +166,6 @@ int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbDropTable
(
tsdb_repo_t
*
pRepo
,
STableId
tableId
);
/**
* Get the information of a table in the repository
...
...
@@ -195,7 +175,6 @@ int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
*
* @return a table information handle for success, NULL for failure and the error number is set
*/
STableInfo
*
tsdbGetTableInfo
(
tsdb_repo_t
*
pRepo
,
STableId
tid
);
// -- FOR INSERT DATA
/**
...
...
src/vnode/tsdb/inc/tsdbCache.h
浏览文件 @
147239d5
...
...
@@ -53,7 +53,7 @@ typedef struct STSDBCache {
#define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next)
#define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev)
STsdbCache
*
tsdb
CreateCache
(
int32_t
numOfBlocks
);
STsdbCache
*
tsdb
InitCache
(
int64_t
maxSize
);
int32_t
tsdbFreeCache
(
STsdbCache
*
pCache
);
void
*
tsdbAllocFromCache
(
STsdbCache
*
pCache
,
int64_t
bytes
);
...
...
src/vnode/tsdb/inc/tsdbFile.h
浏览文件 @
147239d5
...
...
@@ -16,7 +16,8 @@
#define _TD_TSDB_FILE_H_
#include <stdint.h>
// #include "tstring.h"
#include "taosdef.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -34,7 +35,8 @@ typedef enum {
extern
const
char
*
tsdbFileSuffix
[];
typedef
struct
{
int64_t
fileSize
;
int64_t
size
;
int64_t
tombSize
;
}
SFileInfo
;
typedef
struct
{
...
...
src/vnode/tsdb/inc/tsdbMeta.h
浏览文件 @
147239d5
...
...
@@ -20,6 +20,7 @@
#include "tsdb.h"
#include "dataformat.h"
#include "tskiplist.h"
#include "tsdbMetaFile.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -30,62 +31,47 @@ extern "C" {
// Initially, there are 4 tables
#define TSDB_INIT_NUMBER_OF_SUPER_TABLE 4
typedef
enum
{
TSDB_SUPER_TABLE
,
// super table
TSDB_NTABLE
,
// table not created from super table
TSDB_STABLE
// table created from super table
}
TSDB_TABLE_TYPE
;
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
// ---------- TSDB TABLE DEFINITION
typedef
struct
STable
{
STableId
tableId
;
TSDB_TABLE_TYPE
type
;
int64_t
createdTime
;
// super table UID -1 for normal table
int32_t
stableUid
;
int32_t
numOfCols
;
// Schema for this table
// For TSDB_SUPER_TABLE, it is the schema including tags
// For TSDB_NTABLE, it is only the schema, not including tags
// For TSDB_STABLE, it is NULL
STSchema
*
pSchema
;
// Tag value for this table
// For TSDB_SUPER_TABLE and TSDB_NTABLE, it is NULL
// For TSDB_STABLE, it is the tag value string
SDataRow
pTagVal
;
// Object content;
// For TSDB_SUPER_TABLE, it is the index of tables created from it
// For TSDB_STABLE and TSDB_NTABLE, it is the cache data
STableId
tableId
;
int32_t
superUid
;
// Super table UID
STSchema
*
schema
;
STSchema
*
tagSchema
;
SDataRow
tagVal
;
union
{
void
*
pData
;
void
*
pIndex
;
void
*
pData
;
// For TSDB_NTABLE and TSDB_STABLE, it is the skiplist for cache data
void
*
pIndex
;
// For TSDB_SUPER_TABLE, it is the skiplist index
}
content
;
void
*
eventHandler
;
// TODO
void
*
streamHandler
;
// TODO
struct
STable
*
next
;
// TODO: remove the next
}
STable
;
// A handle to deal with event
void
*
eventHandler
;
void
*
tsdbEncodeTable
(
STable
*
pTable
,
int
*
contLen
);
STable
*
tsdbDecodeTable
(
void
*
cont
,
int
contLen
);
void
*
tsdbFreeEncode
(
void
*
cont
);
// A handle to deal with stream
void
*
streamHandler
;
// ---------- TSDB META HANDLE DEFINITION
typedef
struct
{
int32_t
maxTables
;
// Max number of tables
struct
STable
*
next
;
int32_t
nTables
;
// Tables created
}
STable
;
STable
**
tables
;
// table array
typedef
struct
{
int32_t
maxTables
;
int32_t
nTables
;
STable
**
tables
;
// array of normal tables
STable
*
stables
;
// linked list of super tables // TODO use container to implement this
void
*
tableMap
;
// hash map of uid ==> STable *
STable
*
superList
;
// super table list TODO: change it to list container
void
*
map
;
// table map of (uid ===> table)
SMetaFile
*
mfh
;
// meta file handle
}
STsdbMeta
;
STsdbMeta
*
tsdbInitMeta
(
const
char
*
rootDir
,
int32_t
maxTables
);
int32_t
tsdbFreeMeta
(
STsdbMeta
*
pMeta
);
// ---- Operation on STable
#define TSDB_TABLE_ID(pTable) ((pTable)->tableId)
#define TSDB_TABLE_UID(pTable) ((pTable)->uid)
...
...
@@ -97,21 +83,12 @@ typedef struct {
#define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData)
#define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex)
STSchema
*
tsdbGetTableSchema
(
STable
*
pTable
);
// ---- Operation on SMetaHandle
#define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables)
#define TSDB_NUM_OF_SUPER_TABLES(pHandle) ((pHandle)->numOfSuperTables)
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name)
/* TODO */
// Create a new meta handle with configuration
STsdbMeta
*
tsdbCreateMeta
(
int32_t
maxTables
);
int32_t
tsdbFreeMeta
(
STsdbMeta
*
pMeta
);
// Recover the meta handle from the file
STsdbMeta
*
tsdbOpenMeta
(
char
*
tsdbDir
);
int32_t
tsdbCreateTableImpl
(
STsdbMeta
*
pMeta
,
STableCfg
*
pCfg
);
int32_t
tsdbDropTableImpl
(
STsdbMeta
*
pMeta
,
STableId
tableId
);
STable
*
tsdbIsValidTableToInsert
(
STsdbMeta
*
pMeta
,
STableId
tableId
);
...
...
src/vnode/tsdb/inc/tsdbMetaFile.h
0 → 100644
浏览文件 @
147239d5
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TSDB_META_FILE_
#define _TSDB_META_FILE_
#include <stdint.h>
#ifdef __cplusplus
extern
"C"
{
#endif
#define TSDB_META_FILE_NAME "META"
#define TSDB_META_HASH_FRACTION 1.1
typedef
struct
{
int
fd
;
// File descriptor
int
nDel
;
// number of deletions
int
nRecord
;
// Number of records
int64_t
size
;
// Total file size
void
*
map
;
// Map from uid ==> position
}
SMetaFile
;
SMetaFile
*
tsdbInitMetaFile
(
char
*
rootDir
,
int32_t
maxTables
);
int32_t
tsdbInsertMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
,
void
*
cont
,
int32_t
contLen
);
int32_t
tsdbDeleteMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
);
int32_t
tsdbUpdateMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
,
void
*
cont
,
int32_t
contLen
);
void
tsdbCloseMetaFile
(
SMetaFile
*
mfh
);
#ifdef __cplusplus
}
#endif
#endif // _TSDB_META_FILE_
\ No newline at end of file
src/vnode/tsdb/src/tsdbCache.c
浏览文件 @
147239d5
...
...
@@ -16,7 +16,7 @@
#include "tsdbCache.h"
STsdbCache
*
tsdb
CreateCache
(
int32_t
numOfBlocks
)
{
STsdbCache
*
tsdb
InitCache
(
int64_t
maxSize
)
{
STsdbCache
*
pCacheHandle
=
(
STsdbCache
*
)
malloc
(
sizeof
(
STsdbCache
));
if
(
pCacheHandle
==
NULL
)
{
// TODO : deal with the error
...
...
src/vnode/tsdb/src/tsdbFile.c
浏览文件 @
147239d5
...
...
@@ -18,6 +18,52 @@
#include "tsdbFile.h"
typedef
struct
{
int64_t
offset
;
}
SCompHeader
;
typedef
struct
{
int64_t
uid
;
int64_t
last
:
1
;
int64_t
numOfBlocks
:
63
;
int32_t
delimiter
;
}
SCompInfo
;
typedef
struct
{
TSKEY
keyFirst
;
TSKEY
keyLast
;
int32_t
numOfBlocks
;
int32_t
offset
;
}
SCompIdx
;
typedef
struct
{
TSKEY
keyFirst
;
TSKEY
keyLast
;
int64_t
offset
;
int32_t
len
;
int32_t
sversion
;
}
SCompBlock
;
typedef
struct
{
int64_t
uid
;
}
SBlock
;
typedef
struct
{
int16_t
colId
;
int16_t
bytes
;
int32_t
nNullPoints
;
int32_t
type
:
8
;
int32_t
offset
:
24
;
int32_t
len
;
// fields for pre-aggregate
// TODO: pre-aggregation should be seperated
int64_t
sum
;
int64_t
max
;
int64_t
min
;
int16_t
maxIdx
;
int16_t
minIdx
;
}
SField
;
const
char
*
tsdbFileSuffix
[]
=
{
".head"
,
// TSDB_FILE_TYPE_HEAD
".data"
,
// TSDB_FILE_TYPE_DATA
...
...
src/vnode/tsdb/src/tsdbMain.c
浏览文件 @
147239d5
...
...
@@ -42,6 +42,9 @@
#define TSDB_MIN_CACHE_SIZE (4 * 1024 * 1024) // 4M
#define TSDB_MAX_CACHE_SIZE (1024 * 1024 * 1024) // 1G
#define TSDB_CFG_FILE_NAME "CONFIG"
#define TSDB_DATA_DIR_NAME "data"
enum
{
TSDB_REPO_STATE_ACTIVE
,
TSDB_REPO_STATE_CLOSED
,
TSDB_REPO_STATE_CONFIGURING
};
typedef
struct
_tsdb_repo
{
...
...
@@ -75,16 +78,18 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static
int32_t
tsdbDestroyRepoEnv
(
STsdbRepo
*
pRepo
);
static
int
tsdbOpenMetaFile
(
char
*
tsdbDir
);
static
int
tsdbRecoverRepo
(
int
fd
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbInsertDataToTable
(
tsdb_repo_t
*
repo
,
SSubmitBl
oc
k
*
pBlock
);
static
int32_t
tsdbInsertDataToTable
(
tsdb_repo_t
*
repo
,
SSubmitBlk
*
pBlock
);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
#define TSDB_IS_REPO_ACTIVE(pRepo) ((pRepo)->state == TSDB_REPO_STATE_ACTIVE)
#define TSDB_IS_REPO_CLOSED(pRepo) ((pRepo)->state == TSDB_REPO_STATE_CLOSED)
STsdbCfg
*
tsdbCreateDefaultCfg
()
{
STsdbCfg
*
pCfg
=
(
STsdbCfg
*
)
malloc
(
sizeof
(
STsdbCfg
));
if
(
pCfg
==
NULL
)
return
NULL
;
/**
* Set the default TSDB configuration
*/
void
tsdbSetDefaultCfg
(
STsdbCfg
*
pCfg
)
{
if
(
pCfg
==
NULL
)
return
;
pCfg
->
precision
=
-
1
;
pCfg
->
tsdbId
=
0
;
...
...
@@ -94,6 +99,18 @@ STsdbCfg *tsdbCreateDefaultCfg() {
pCfg
->
maxRowsPerFileBlock
=
-
1
;
pCfg
->
keep
=
-
1
;
pCfg
->
maxCacheSize
=
-
1
;
}
/**
* Create a configuration for TSDB default
* @return a pointer to a configuration. the configuration object
* must call tsdbFreeCfg to free memory after usage
*/
STsdbCfg
*
tsdbCreateDefaultCfg
()
{
STsdbCfg
*
pCfg
=
(
STsdbCfg
*
)
malloc
(
sizeof
(
STsdbCfg
));
if
(
pCfg
==
NULL
)
return
NULL
;
tsdbSetDefaultCfg
(
pCfg
);
return
pCfg
;
}
...
...
@@ -102,7 +119,15 @@ void tsdbFreeCfg(STsdbCfg *pCfg) {
if
(
pCfg
!=
NULL
)
free
(
pCfg
);
}
tsdb_repo_t
*
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
,
void
*
limiter
)
{
/**
* Create a new TSDB repository
* @param rootDir the TSDB repository root directory
* @param pCfg the TSDB repository configuration, upper layer need to free the pointer
* @param limiter the limitation tracker will implement in the future, make it void now
*
* @return a TSDB repository handle on success, NULL for failure
*/
tsdb_repo_t
*
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
,
void
*
limiter
/* TODO */
)
{
if
(
rootDir
==
NULL
)
return
NULL
;
if
(
access
(
rootDir
,
F_OK
|
R_OK
|
W_OK
)
==
-
1
)
return
NULL
;
...
...
@@ -120,35 +145,44 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) {
pRepo
->
config
=
*
pCfg
;
pRepo
->
limiter
=
limiter
;
pRepo
->
tsdbMeta
=
tsdbCreateMeta
(
pCfg
->
maxTables
);
if
(
pRepo
->
tsdbMeta
==
NULL
)
{
// Create the environment files and directories
if
(
tsdbSetRepoEnv
(
pRepo
)
<
0
)
{
free
(
pRepo
->
rootDir
);
free
(
pRepo
);
return
NULL
;
}
pRepo
->
tsdbCache
=
tsdbCreateCache
(
5
);
if
(
pRepo
->
tsdbCache
==
NULL
)
{
// Initialize meta
STsdbMeta
*
pMeta
=
tsdbInitMeta
(
rootDir
,
pCfg
->
maxTables
);
if
(
pMeta
==
NULL
)
{
free
(
pRepo
->
rootDir
);
tsdbFreeMeta
(
pRepo
->
tsdbMeta
);
free
(
pRepo
);
return
NULL
;
}
pRepo
->
tsdbMeta
=
pMeta
;
// Create the Meta data file and data directory
if
(
tsdbSetRepoEnv
(
pRepo
)
<
0
)
{
// Initialize cache
STsdbCache
*
pCache
=
tsdbInitCache
(
pCfg
->
maxCacheSize
);
if
(
pCache
==
NULL
)
{
free
(
pRepo
->
rootDir
);
tsdbFreeMeta
(
pRepo
->
tsdbMeta
);
tsdbFreeCache
(
pRepo
->
tsdbCache
);
free
(
pRepo
);
return
NULL
;
}
pRepo
->
tsdbCache
=
pCache
;
pRepo
->
state
=
TSDB_REPO_STATE_ACTIVE
;
return
(
tsdb_repo_t
*
)
pRepo
;
}
/**
* Close and free all resources taken by the repository
* @param repo the TSDB repository handle. The interface will free the handle too, so upper
* layer do NOT need to free the repo handle again.
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbDropRepo
(
tsdb_repo_t
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
...
...
@@ -169,6 +203,12 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) {
return
0
;
}
/**
* Open an existing TSDB storage repository
* @param tsdbDir the existing TSDB root directory
*
* @return a TSDB repository handle on success, NULL for failure and the error number is set
*/
tsdb_repo_t
*
tsdbOpenRepo
(
char
*
tsdbDir
)
{
if
(
access
(
tsdbDir
,
F_OK
|
W_OK
|
R_OK
)
<
0
)
{
return
NULL
;
...
...
@@ -191,7 +231,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
return
NULL
;
}
pRepo
->
tsdbCache
=
tsdb
Create
Cache
(
5
);
pRepo
->
tsdbCache
=
tsdb
Init
Cache
(
5
);
if
(
pRepo
->
tsdbCache
==
NULL
)
{
// TODO: deal with error
return
NULL
;
...
...
@@ -208,6 +248,13 @@ static int32_t tsdbFlushCache(STsdbRepo *pRepo) {
return
0
;
}
/**
* Close a TSDB repository. Only free memory resources, and keep the files.
* @param repo the opened TSDB repository handle. The interface will free the handle too, so upper
* layer do NOT need to free the repo handle again.
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbCloseRepo
(
tsdb_repo_t
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
if
(
pRepo
==
NULL
)
return
0
;
...
...
@@ -223,6 +270,12 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) {
return
0
;
}
/**
* Change the configuration of a repository
* @param pCfg the repository configuration, the upper layer should free the pointer
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbConfigRepo
(
tsdb_repo_t
*
repo
,
STsdbCfg
*
pCfg
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
...
...
@@ -231,22 +284,30 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) {
return
0
;
}
/**
* Get the TSDB repository information, including some statistics
* @param pRepo the TSDB repository handle
* @param error the error number to set when failure occurs
*
* @return a info struct handle on success, NULL for failure and the error number is set. The upper
* layers should free the info handle themselves or memory leak will occur
*/
STsdbRepoInfo
*
tsdbGetStatus
(
tsdb_repo_t
*
pRepo
)
{
// TODO
return
NULL
;
}
int
32_t
tsdbCreateTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
)
{
int
tsdbCreateTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
return
tsdbCreateTableImpl
(
pRepo
->
tsdbMeta
,
pCfg
);
}
int
32_t
tsdbAlterTable
(
tsdb_repo_t
*
pRepo
,
STableCfg
*
pCfg
)
{
int
tsdbAlterTable
(
tsdb_repo_t
*
pRepo
,
STableCfg
*
pCfg
)
{
// TODO
return
0
;
}
int
32_t
tsdbDropTable
(
tsdb_repo_t
*
repo
,
STableId
tableId
)
{
int
tsdbDropTable
(
tsdb_repo_t
*
repo
,
STableId
tableId
)
{
// TODO
if
(
repo
==
NULL
)
return
-
1
;
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
...
...
@@ -261,18 +322,150 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) {
// TODO: need to return the number of data inserted
int32_t
tsdbInsertData
(
tsdb_repo_t
*
repo
,
SSubmitMsg
*
pMsg
)
{
SSubmit
Block
*
pBlock
=
(
SSubmitBlock
*
)
pMsg
->
data
;
SSubmit
MsgIter
msgIter
;
for
(
int
i
=
0
;
i
<
pMsg
->
numOfTables
;
i
++
)
{
// Loop to deal with the submit message
tsdbInitSubmitMsgIter
(
pMsg
,
&
msgIter
);
SSubmitBlk
*
pBlock
;
while
((
pBlock
=
tsdbGetSubmitMsgNext
(
&
msgIter
))
!=
NULL
)
{
if
(
tsdbInsertDataToTable
(
repo
,
pBlock
)
<
0
)
{
return
-
1
;
}
pBlock
=
(
SSubmitBlock
*
)(((
char
*
)
pBlock
)
+
sizeof
(
SSubmitBlock
)
+
pBlock
->
len
);
}
return
0
;
}
/**
* Initialize a table configuration
*/
int
tsdbInitTableCfg
(
STableCfg
*
config
,
TSDB_TABLE_TYPE
type
,
int64_t
uid
,
int32_t
tid
)
{
if
(
config
==
NULL
)
return
-
1
;
if
(
type
!=
TSDB_NTABLE
&&
type
!=
TSDB_STABLE
)
return
-
1
;
memset
((
void
*
)
config
,
0
,
sizeof
(
STableCfg
));
config
->
type
=
type
;
config
->
superUid
=
TSDB_INVALID_SUPER_TABLE_ID
;
config
->
tableId
.
uid
=
uid
;
config
->
tableId
.
tid
=
tid
;
return
0
;
}
/**
* Set the super table UID of the created table
*/
int
tsdbTableSetSuperUid
(
STableCfg
*
config
,
int64_t
uid
)
{
if
(
config
->
type
!=
TSDB_STABLE
)
return
-
1
;
if
(
uid
==
TSDB_INVALID_SUPER_TABLE_ID
)
return
-
1
;
config
->
superUid
=
uid
;
return
0
;
}
/**
* Set the table schema in the configuration
* @param config the configuration to set
* @param pSchema the schema to set
* @param dup use the schema directly or duplicate one for use
*
* @return 0 for success and -1 for failure
*/
int
tsdbTableSetSchema
(
STableCfg
*
config
,
STSchema
*
pSchema
,
bool
dup
)
{
if
(
dup
)
{
config
->
schema
=
tdDupSchema
(
pSchema
);
}
else
{
config
->
schema
=
pSchema
;
}
return
0
;
}
/**
* Set the table schema in the configuration
* @param config the configuration to set
* @param pSchema the schema to set
* @param dup use the schema directly or duplicate one for use
*
* @return 0 for success and -1 for failure
*/
int
tsdbTableSetTagSchema
(
STableCfg
*
config
,
STSchema
*
pSchema
,
bool
dup
)
{
if
(
config
->
type
!=
TSDB_STABLE
)
return
-
1
;
if
(
dup
)
{
config
->
tagSchema
=
tdDupSchema
(
pSchema
);
}
else
{
config
->
tagSchema
=
pSchema
;
}
return
0
;
}
int
tsdbTableSetTagValue
(
STableCfg
*
config
,
SDataRow
row
,
bool
dup
)
{
if
(
config
->
type
!=
TSDB_STABLE
)
return
-
1
;
if
(
dup
)
{
config
->
tagValues
=
tdDataRowDup
(
row
);
}
else
{
config
->
tagValues
=
row
;
}
return
0
;
}
void
tsdbClearTableCfg
(
STableCfg
*
config
)
{
if
(
config
->
schema
)
tdFreeSchema
(
config
->
schema
);
if
(
config
->
tagSchema
)
tdFreeSchema
(
config
->
tagSchema
);
if
(
config
->
tagValues
)
tdFreeDataRow
(
config
->
tagValues
);
}
int
tsdbInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
)
{
if
(
pBlock
->
len
<=
0
)
return
-
1
;
pIter
->
totalLen
=
pBlock
->
len
;
pIter
->
len
=
0
;
pIter
->
row
=
(
SDataRow
)(
pBlock
->
data
);
return
0
;
}
SDataRow
tsdbGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
)
{
SDataRow
row
=
pIter
->
row
;
if
(
row
==
NULL
)
return
NULL
;
pIter
->
len
+=
dataRowLen
(
row
);
if
(
pIter
->
len
>=
pIter
->
totalLen
)
{
pIter
->
row
=
NULL
;
}
else
{
pIter
->
row
=
(
char
*
)
row
+
dataRowLen
(
row
);
}
return
row
;
}
int
tsdbInitSubmitMsgIter
(
SSubmitMsg
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
if
(
pMsg
==
NULL
||
pIter
==
NULL
)
return
-
1
;
pIter
->
totalLen
=
pMsg
->
length
;
pIter
->
len
=
TSDB_SUBMIT_MSG_HEAD_SIZE
;
if
(
pMsg
->
length
<=
TSDB_SUBMIT_MSG_HEAD_SIZE
)
{
pIter
->
pBlock
=
NULL
;
}
else
{
pIter
->
pBlock
=
pMsg
->
blocks
;
}
return
0
;
}
SSubmitBlk
*
tsdbGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
)
{
SSubmitBlk
*
pBlock
=
pIter
->
pBlock
;
if
(
pBlock
==
NULL
)
return
NULL
;
pIter
->
len
=
pIter
->
len
+
sizeof
(
SSubmitBlk
)
+
pBlock
->
len
;
if
(
pIter
->
len
>=
pIter
->
totalLen
)
{
pIter
->
pBlock
=
NULL
;
}
else
{
pIter
->
pBlock
=
(
char
*
)
pBlock
+
pBlock
->
len
+
sizeof
(
SSubmitBlk
);
}
return
pBlock
;
}
// Check the configuration and set default options
static
int32_t
tsdbCheckAndSetDefaultCfg
(
STsdbCfg
*
pCfg
)
{
// Check precision
...
...
@@ -285,7 +478,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// Check tsdbId
if
(
pCfg
->
tsdbId
<
0
)
return
-
1
;
// Check
M
axTables
// Check
m
axTables
if
(
pCfg
->
maxTables
==
-
1
)
{
pCfg
->
maxTables
=
TSDB_DEFAULT_TABLES
;
}
else
{
...
...
@@ -333,10 +526,18 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
return
0
;
}
static
int32_t
tsdbSetRepoEnv
(
STsdbRepo
*
pRepo
)
{
char
*
metaFname
=
tsdbGetFileName
(
pRepo
->
rootDir
,
"tsdb"
,
TSDB_FILE_TYPE_META
);
static
int32_t
tsdbGetCfgFname
(
STsdbRepo
*
pRepo
,
char
*
fname
)
{
if
(
pRepo
==
NULL
)
return
-
1
;
sprintf
(
fname
,
"%s/%s"
,
pRepo
->
rootDir
,
TSDB_CFG_FILE_NAME
);
return
0
;
}
static
int32_t
tsdbSaveConfig
(
STsdbRepo
*
pRepo
)
{
char
fname
[
128
]
=
"
\0
"
;
// TODO: get rid of the literal 128
if
(
tsdbGetCfgFname
(
pRepo
,
fname
)
<
0
)
return
-
1
;
int
fd
=
open
(
metaFname
,
O_WRONLY
|
O_CREAT
);
int
fd
=
open
(
fname
,
O_WRONLY
|
O_CREAT
,
0755
);
if
(
fd
<
0
)
{
return
-
1
;
}
...
...
@@ -345,19 +546,45 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
return
-
1
;
}
// Create the data file
char
*
dirName
=
calloc
(
1
,
strlen
(
pRepo
->
rootDir
)
+
strlen
(
"tsdb"
)
+
2
);
if
(
dirName
==
NULL
)
{
close
(
fd
);
return
0
;
}
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
)
{
char
fname
[
128
]
=
"
\0
"
;
if
(
tsdbGetCfgFname
(
pRepo
,
fname
)
<
0
)
return
-
1
;
int
fd
=
open
(
fname
,
O_RDONLY
);
if
(
fd
<
0
)
{
return
-
1
;
}
sprintf
(
dirName
,
"%s/%s"
,
pRepo
->
rootDir
,
"tsdb"
);
if
(
mkdir
(
dirName
,
0755
)
<
0
)
{
free
(
dirName
);
if
(
read
(
fd
,
(
void
*
)
pCfg
,
sizeof
(
STsdbCfg
))
<
sizeof
(
STsdbCfg
))
{
close
(
fd
);
return
-
1
;
}
free
(
dirName
);
close
(
fd
);
return
0
;
}
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
)
{
if
(
pRepo
==
NULL
||
pRepo
->
rootDir
==
NULL
)
return
-
1
;
sprintf
(
fname
,
"%s/%s"
,
pRepo
->
rootDir
,
TSDB_DATA_DIR_NAME
);
return
0
;
}
static
int32_t
tsdbSetRepoEnv
(
STsdbRepo
*
pRepo
)
{
if
(
tsdbSaveConfig
(
pRepo
)
<
0
)
return
-
1
;
char
dirName
[
128
]
=
"
\0
"
;
if
(
tsdbGetDataDirName
(
pRepo
,
dirName
)
<
0
)
return
-
1
;
if
(
mkdir
(
dirName
,
0755
)
<
0
)
{
return
-
1
;
}
return
0
;
}
...
...
@@ -417,7 +644,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
}
pNode
->
level
=
level
;
tdD
ataRowCpy
(
SL_GET_NODE_DATA
(
pNode
),
row
);
d
ataRowCpy
(
SL_GET_NODE_DATA
(
pNode
),
row
);
// Insert the skiplist node into the data
tsdbInsertRowToTableImpl
(
pNode
,
pTable
);
...
...
@@ -425,23 +652,19 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
return
0
;
}
static
int32_t
tsdbInsertDataToTable
(
tsdb_repo_t
*
repo
,
SSubmitBl
oc
k
*
pBlock
)
{
static
int32_t
tsdbInsertDataToTable
(
tsdb_repo_t
*
repo
,
SSubmitBlk
*
pBlock
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STable
*
pTable
=
tsdbIsValidTableToInsert
(
pRepo
->
tsdbMeta
,
pBlock
->
tableId
);
if
(
pTable
==
NULL
)
{
return
-
1
;
}
if
(
pTable
==
NULL
)
return
-
1
;
SDataRows
rows
=
pBlock
->
data
;
SDataRowsIter
rDataIter
,
*
pIter
;
pIter
=
&
rDataIter
;
SSubmitBlkIter
blkIter
;
SDataRow
row
;
t
dInitSDataRowsIter
(
rows
,
p
Iter
);
while
((
row
=
t
dDataRowsNext
(
p
Iter
))
!=
NULL
)
{
t
sdbInitSubmitBlkIter
(
pBlock
,
&
blk
Iter
);
while
((
row
=
t
sdbGetSubmitBlkNext
(
&
blk
Iter
))
!=
NULL
)
{
if
(
tdInsertRowToTable
(
pRepo
,
row
,
pTable
)
<
0
)
{
// TODO: deal with the error here
return
-
1
;
}
}
...
...
src/vnode/tsdb/src/tsdbMeta.c
浏览文件 @
147239d5
...
...
@@ -9,6 +9,7 @@
#include "tsdbCache.h"
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
#define TSDB_META_FILE_NAME "META"
static
int
tsdbFreeTable
(
STable
*
pTable
);
static
int32_t
tsdbCheckTableCfg
(
STableCfg
*
pCfg
);
...
...
@@ -16,24 +17,85 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable);
static
int
tsdbAddTableIntoMap
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
static
int
tsdbAddTableIntoIndex
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
static
int
tsdbRemoveTableFromIndex
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
static
int
tsdbEstimateTableEncodeSize
(
STable
*
pTable
);
STsdbMeta
*
tsdbCreateMeta
(
int32_t
maxTables
)
{
STsdbMeta
*
pMeta
=
(
STsdbMeta
*
)
malloc
(
sizeof
(
STsdbMeta
));
if
(
pMeta
==
NULL
)
{
return
NULL
;
/**
* Encode a TSDB table object as a binary content
* ASSUMPTIONS: VALID PARAMETERS
*
* @param pTable table object to encode
* @param contLen the encoded binary content length
*
* @return binary content for success
* NULL fro failure
*/
void
*
tsdbEncodeTable
(
STable
*
pTable
,
int
*
contLen
)
{
if
(
pTable
==
NULL
)
return
NULL
;
*
contLen
=
tsdbEstimateTableEncodeSize
(
pTable
);
if
(
*
contLen
<
0
)
return
NULL
;
void
*
ret
=
malloc
(
*
contLen
);
if
(
ret
==
NULL
)
return
NULL
;
// TODO: encode the object to the memory
{}
return
ret
;
}
/**
* Decode from an encoded binary
* ASSUMPTIONS: valid parameters
*
* @param cont binary object
* @param contLen binary length
*
* @return TSDB table object for success
* NULL for failure
*/
STable
*
tsdbDecodeTable
(
void
*
cont
,
int
contLen
)
{
STable
*
pTable
=
(
STable
*
)
calloc
(
1
,
sizeof
(
STable
));
if
(
pTable
==
NULL
)
return
NULL
;
{
// TODO recover from the binary content
}
return
pTable
;
}
void
*
tsdbFreeEncode
(
void
*
cont
)
{
if
(
cont
!=
NULL
)
free
(
cont
);
}
/**
* Initialize the meta handle
* ASSUMPTIONS: VALID PARAMETER
*/
STsdbMeta
*
tsdbInitMeta
(
const
char
*
rootDir
,
int32_t
maxTables
)
{
STsdbMeta
*
pMeta
=
(
STsdbMeta
*
)
malloc
(
sizeof
(
STsdbMeta
));
if
(
pMeta
==
NULL
)
return
NULL
;
pMeta
->
maxTables
=
maxTables
;
pMeta
->
nTables
=
0
;
pMeta
->
s
tables
=
NULL
;
pMeta
->
s
uperList
=
NULL
;
pMeta
->
tables
=
(
STable
**
)
calloc
(
maxTables
,
sizeof
(
STable
*
));
if
(
pMeta
->
tables
==
NULL
)
{
free
(
pMeta
);
return
NULL
;
}
pMeta
->
tableMap
=
taosHashInit
(
maxTables
+
maxTables
/
10
,
taosGetDefaultHashFunction
,
false
);
if
(
pMeta
->
tableMap
==
NULL
)
{
pMeta
->
map
=
taosHashInit
(
maxTables
*
TSDB_META_HASH_FRACTION
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
if
(
pMeta
->
map
==
NULL
)
{
free
(
pMeta
->
tables
);
free
(
pMeta
);
return
NULL
;
}
pMeta
->
mfh
=
tsdbInitMetaFile
(
rootDir
,
maxTables
);
if
(
pMeta
->
mfh
==
NULL
)
{
taosHashCleanup
(
pMeta
->
map
);
free
(
pMeta
->
tables
);
free
(
pMeta
);
return
NULL
;
...
...
@@ -45,6 +107,8 @@ STsdbMeta *tsdbCreateMeta(int32_t maxTables) {
int32_t
tsdbFreeMeta
(
STsdbMeta
*
pMeta
)
{
if
(
pMeta
==
NULL
)
return
0
;
tsdbCloseMetaFile
(
pMeta
->
mfh
);
for
(
int
i
=
0
;
i
<
pMeta
->
maxTables
;
i
++
)
{
if
(
pMeta
->
tables
[
i
]
!=
NULL
)
{
tsdbFreeTable
(
pMeta
->
tables
[
i
]);
...
...
@@ -53,14 +117,14 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
free
(
pMeta
->
tables
);
STable
*
pTable
=
pMeta
->
s
tables
;
STable
*
pTable
=
pMeta
->
s
uperList
;
while
(
pTable
!=
NULL
)
{
STable
*
pTemp
=
pTable
;
pTable
=
pTemp
->
next
;
tsdbFreeTable
(
pTemp
);
}
taosHashCleanup
(
pMeta
->
tableM
ap
);
taosHashCleanup
(
pMeta
->
m
ap
);
free
(
pMeta
);
...
...
@@ -68,74 +132,65 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
}
int32_t
tsdbCreateTableImpl
(
STsdbMeta
*
pMeta
,
STableCfg
*
pCfg
)
{
if
(
tsdbCheckTableCfg
(
pCfg
)
<
0
)
{
return
-
1
;
}
if
(
tsdbCheckTableCfg
(
pCfg
)
<
0
)
return
-
1
;
STable
*
pSTable
=
NULL
;
STable
*
super
=
NULL
;
int
newSuper
=
0
;
if
(
IS_CREATE_STABLE
(
pCfg
))
{
// to create a TSDB_STABLE, check if super table exists
pSTable
=
tsdbGetTableByUid
(
pMeta
,
pCfg
->
stable
Uid
);
if
(
pSTable
==
NULL
)
{
// super table not exists, try to create it
if
(
pCfg
->
type
==
TSDB_STABLE
)
{
super
=
tsdbGetTableByUid
(
pMeta
,
pCfg
->
super
Uid
);
if
(
super
==
NULL
)
{
// super table not exists, try to create it
newSuper
=
1
;
pSTable
=
(
STable
*
)
calloc
(
1
,
sizeof
(
STable
));
if
(
pSTable
==
NULL
)
return
-
1
;
pSTable
->
tableId
.
uid
=
pCfg
->
stableUid
;
pSTable
->
tableId
.
tid
=
-
1
;
pSTable
->
type
=
TSDB_SUPER_TABLE
;
// pSTable->createdTime = pCfg->createdTime; // The created time is not required
pSTable
->
stableUid
=
-
1
;
pSTable
->
numOfCols
=
pCfg
->
numOfCols
;
pSTable
->
pSchema
=
tdDupSchema
(
pCfg
->
schema
);
pSTable
->
content
.
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
sizeof
(
int64_t
),
1
,
// TODO: use function to implement create table object
super
=
(
STable
*
)
calloc
(
1
,
sizeof
(
STable
));
if
(
super
==
NULL
)
return
-
1
;
super
->
type
=
TSDB_SUPER_TABLE
;
super
->
tableId
.
uid
=
pCfg
->
superUid
;
super
->
tableId
.
tid
=
-
1
;
super
->
superUid
=
TSDB_INVALID_SUPER_TABLE_ID
;
super
->
schema
=
tdDupSchema
(
pCfg
->
schema
);
super
->
tagSchema
=
tdDupSchema
(
pCfg
->
tagSchema
);
super
->
tagVal
=
tdDataRowDup
(
pCfg
->
tagValues
);
super
->
content
.
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
sizeof
(
int64_t
),
1
,
0
,
NULL
);
// Allow duplicate key, no lock
if
(
pSTable
->
content
.
pIndex
==
NULL
)
{
free
(
pSTable
);
if
(
super
->
content
.
pIndex
==
NULL
)
{
tdFreeSchema
(
super
->
schema
);
tdFreeSchema
(
super
->
tagSchema
);
tdFreeDataRow
(
super
->
tagVal
);
free
(
super
);
return
-
1
;
}
}
else
{
if
(
pSTable
->
type
!=
TSDB_SUPER_TABLE
)
return
-
1
;
if
(
super
->
type
!=
TSDB_SUPER_TABLE
)
return
-
1
;
}
}
STable
*
pT
able
=
(
STable
*
)
malloc
(
sizeof
(
STable
));
if
(
pT
able
==
NULL
)
{
if
(
newSuper
)
tsdbFreeTable
(
pSTable
);
STable
*
t
able
=
(
STable
*
)
malloc
(
sizeof
(
STable
));
if
(
t
able
==
NULL
)
{
if
(
newSuper
)
tsdbFreeTable
(
super
);
return
-
1
;
}
pTable
->
tableId
=
pCfg
->
tableId
;
pTable
->
createdTime
=
pCfg
->
createdTime
;
table
->
tableId
=
pCfg
->
tableId
;
if
(
IS_CREATE_STABLE
(
pCfg
))
{
// TSDB_STABLE
pT
able
->
type
=
TSDB_STABLE
;
pTable
->
stableUid
=
pCfg
->
stable
Uid
;
pTable
->
pT
agVal
=
tdDataRowDup
(
pCfg
->
tagValues
);
t
able
->
type
=
TSDB_STABLE
;
table
->
superUid
=
pCfg
->
super
Uid
;
table
->
t
agVal
=
tdDataRowDup
(
pCfg
->
tagValues
);
}
else
{
// TSDB_NTABLE
pT
able
->
type
=
TSDB_NTABLE
;
pTable
->
stable
Uid
=
-
1
;
pTable
->
pS
chema
=
tdDupSchema
(
pCfg
->
schema
);
t
able
->
type
=
TSDB_NTABLE
;
table
->
super
Uid
=
-
1
;
table
->
s
chema
=
tdDupSchema
(
pCfg
->
schema
);
}
pT
able
->
content
.
pData
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
0
,
8
,
0
,
0
,
NULL
);
t
able
->
content
.
pData
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
0
,
8
,
0
,
0
,
NULL
);
if
(
newSuper
)
tsdbAddTableToMeta
(
pMeta
,
pSTable
);
tsdbAddTableToMeta
(
pMeta
,
pT
able
);
if
(
newSuper
)
tsdbAddTableToMeta
(
pMeta
,
super
);
tsdbAddTableToMeta
(
pMeta
,
t
able
);
return
0
;
}
STsdbMeta
*
tsdbOpenMeta
(
char
*
tsdbDir
)
{
// TODO : Open meta file for reading
STsdbMeta
*
pMeta
=
(
STsdbMeta
*
)
malloc
(
sizeof
(
STsdbMeta
));
if
(
pMeta
==
NULL
)
{
return
NULL
;
}
return
pMeta
;
}
/**
* Check if a table is valid to insert.
* @return NULL for invalid and the pointer to the table if valid
...
...
@@ -183,9 +238,9 @@ int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
static
int
tsdbFreeTable
(
STable
*
pTable
)
{
// TODO: finish this function
if
(
pTable
->
type
==
TSDB_STABLE
)
{
tdFreeDataRow
(
pTable
->
pT
agVal
);
tdFreeDataRow
(
pTable
->
t
agVal
);
}
else
{
tdFreeSchema
(
pTable
->
pS
chema
);
tdFreeSchema
(
pTable
->
s
chema
);
}
// Free content
...
...
@@ -205,7 +260,7 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg) {
}
STable
*
tsdbGetTableByUid
(
STsdbMeta
*
pMeta
,
int64_t
uid
)
{
void
*
ptr
=
taosHashGet
(
pMeta
->
tableM
ap
,
(
char
*
)(
&
uid
),
sizeof
(
uid
));
void
*
ptr
=
taosHashGet
(
pMeta
->
m
ap
,
(
char
*
)(
&
uid
),
sizeof
(
uid
));
if
(
ptr
==
NULL
)
return
NULL
;
...
...
@@ -215,12 +270,12 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
static
int
tsdbAddTableToMeta
(
STsdbMeta
*
pMeta
,
STable
*
pTable
)
{
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
// add super table to the linked list
if
(
pMeta
->
s
tables
==
NULL
)
{
pMeta
->
s
tables
=
pTable
;
if
(
pMeta
->
s
uperList
==
NULL
)
{
pMeta
->
s
uperList
=
pTable
;
pTable
->
next
=
NULL
;
}
else
{
STable
*
pTemp
=
pMeta
->
s
tables
;
pMeta
->
s
tables
=
pTable
;
STable
*
pTemp
=
pMeta
->
s
uperList
;
pMeta
->
s
uperList
=
pTable
;
pTable
->
next
=
pTemp
;
}
}
else
{
...
...
@@ -244,7 +299,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
static
int
tsdbAddTableIntoMap
(
STsdbMeta
*
pMeta
,
STable
*
pTable
)
{
// TODO: add the table to the map
int64_t
uid
=
pTable
->
tableId
.
uid
;
if
(
taosHashPut
(
pMeta
->
tableM
ap
,
(
char
*
)(
&
uid
),
sizeof
(
uid
),
(
void
*
)(
&
pTable
),
sizeof
(
pTable
))
<
0
)
{
if
(
taosHashPut
(
pMeta
->
m
ap
,
(
char
*
)(
&
uid
),
sizeof
(
uid
),
(
void
*
)(
&
pTable
),
sizeof
(
pTable
))
<
0
)
{
return
-
1
;
}
return
0
;
...
...
@@ -259,4 +314,9 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
assert
(
pTable
->
type
==
TSDB_STABLE
);
// TODO
return
0
;
}
static
int
tsdbEstimateTableEncodeSize
(
STable
*
pTable
)
{
// TODO
return
0
;
}
\ No newline at end of file
src/vnode/tsdb/src/tsdbMetaFile.c
0 → 100644
浏览文件 @
147239d5
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <unistd.h>
#include "taosdef.h"
#include "hash.h"
#include "tsdbMetaFile.h"
#define TSDB_META_FILE_HEADER_SIZE 512
typedef
struct
{
int32_t
offset
;
int32_t
size
;
}
SRecordInfo
;
static
int32_t
tsdbGetMetaFileName
(
char
*
rootDir
,
char
*
fname
);
static
int32_t
tsdbCheckMetaHeader
(
int
fd
);
static
int32_t
tsdbWriteMetaHeader
(
int
fd
);
static
int
tsdbCreateMetaFile
(
char
*
fname
);
static
int
tsdbRestoreFromMetaFile
(
char
*
fname
,
SMetaFile
*
mfh
);
SMetaFile
*
tsdbInitMetaFile
(
char
*
rootDir
,
int32_t
maxTables
)
{
// TODO
char
fname
[
128
]
=
"
\0
"
;
if
(
tsdbGetMetaFileName
(
rootDir
,
fname
)
<
0
)
return
NULL
;
SMetaFile
*
mfh
=
(
SMetaFile
*
)
calloc
(
1
,
sizeof
(
SMetaFile
));
if
(
mfh
==
NULL
)
return
NULL
;
// OPEN MAP
mfh
->
map
=
taosHashInit
(
maxTables
*
TSDB_META_HASH_FRACTION
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
if
(
mfh
->
map
==
NULL
)
{
free
(
mfh
);
return
NULL
;
}
// OPEN FILE
if
(
access
(
fname
,
F_OK
)
<
0
)
{
// file not exists
mfh
->
fd
=
tsdbCreateMetaFile
(
fname
);
if
(
mfh
->
fd
<
0
)
{
taosHashCleanup
(
mfh
->
map
);
free
(
mfh
);
return
NULL
;
}
}
else
{
// file exists, recover from file
if
(
tsdbRestoreFromMetaFile
(
fname
,
mfh
)
<
0
)
{
taosHashCleanup
(
mfh
->
map
);
free
(
mfh
);
return
NULL
;
}
}
return
mfh
;
}
int32_t
tsdbInsertMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
,
void
*
cont
,
int32_t
contLen
)
{
if
(
taosHashGet
(
mfh
->
map
,
(
char
*
)(
&
uid
),
sizeof
(
uid
))
!=
NULL
)
{
return
-
1
;
}
SRecordInfo
info
;
info
.
offset
=
mfh
->
size
;
info
.
size
=
contLen
;
// TODO: Here is not correct
mfh
->
size
+=
(
contLen
+
sizeof
(
SRecordInfo
));
if
(
taosHashPut
(
mfh
->
map
,
(
char
*
)(
&
uid
),
sizeof
(
uid
),
(
void
*
)(
&
info
),
sizeof
(
SRecordInfo
))
<
0
)
{
return
-
1
;
}
// TODO: make below a function to implement
if
(
fseek
(
mfh
->
fd
,
info
.
offset
,
SEEK_CUR
)
<
0
)
{
return
-
1
;
}
if
(
write
(
mfh
->
fd
,
(
void
*
)(
&
info
),
sizeof
(
SRecordInfo
))
<
0
)
{
return
-
1
;
}
if
(
write
(
mfh
->
fd
,
cont
,
contLen
)
<
0
)
{
return
-
1
;
}
fsync
(
mfh
->
fd
);
mfh
->
nRecord
++
;
return
0
;
}
int32_t
tsdbDeleteMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
)
{
char
*
ptr
=
taosHashGet
(
mfh
->
map
,
(
char
*
)(
&
uid
),
sizeof
(
uid
));
if
(
ptr
==
NULL
)
return
-
1
;
SRecordInfo
info
=
*
(
SRecordInfo
*
)
ptr
;
// Remove record from hash table
taosHashRemove
(
mfh
->
map
,
(
char
*
)(
&
uid
),
sizeof
(
uid
));
// Remove record from file
info
.
offset
=
-
info
.
offset
;
if
(
fseek
(
mfh
->
fd
,
-
info
.
offset
,
SEEK_CUR
)
<
0
)
{
return
-
1
;
}
if
(
write
(
mfh
->
fd
,
(
void
*
)(
&
info
),
sizeof
(
SRecordInfo
))
<
0
)
{
return
-
1
;
}
fsync
(
mfh
->
fd
);
mfh
->
nDel
++
;
return
0
;
}
int32_t
tsdbUpdateMetaRecord
(
SMetaFile
*
mfh
,
int64_t
uid
,
void
*
cont
,
int32_t
contLen
)
{
char
*
ptr
=
taosHashGet
(
mfh
->
map
,
(
char
*
)(
&
uid
),
sizeof
(
uid
));
if
(
ptr
==
NULL
)
return
-
1
;
SRecordInfo
info
=
*
(
SRecordInfo
*
)
ptr
;
// Update the hash table
if
(
taosHashPut
(
mfh
->
map
,
(
char
*
)(
&
uid
),
sizeof
(
uid
),
(
void
*
)(
&
info
),
sizeof
(
SRecordInfo
))
<
0
)
{
return
-
1
;
}
// Update record in file
if
(
info
.
size
>=
contLen
)
{
// Just update it in place
info
.
size
=
contLen
;
}
else
{
// Just append to the end of file
info
.
offset
=
mfh
->
size
;
info
.
size
=
contLen
;
mfh
->
size
+=
contLen
;
}
if
(
fseek
(
mfh
->
fd
,
-
info
.
offset
,
SEEK_CUR
)
<
0
)
{
return
-
1
;
}
if
(
write
(
mfh
->
fd
,
(
void
*
)(
&
info
),
sizeof
(
SRecordInfo
))
<
0
)
{
return
-
1
;
}
fsync
(
mfh
->
fd
);
return
0
;
}
void
tsdbCloseMetaFile
(
SMetaFile
*
mfh
)
{
if
(
mfh
==
NULL
)
return
;
close
(
mfh
);
taosHashCleanup
(
mfh
->
map
);
}
static
int32_t
tsdbGetMetaFileName
(
char
*
rootDir
,
char
*
fname
)
{
if
(
rootDir
==
NULL
)
return
-
1
;
sprintf
(
fname
,
"%s/%s"
,
rootDir
,
TSDB_META_FILE_NAME
);
return
0
;
}
static
int32_t
tsdbCheckMetaHeader
(
int
fd
)
{
// TODO: write the meta file header check function
return
0
;
}
static
int32_t
tsdbWriteMetaHeader
(
int
fd
)
{
// TODO: write the meta file header to file
return
0
;
}
static
int
tsdbCreateMetaFile
(
char
*
fname
)
{
int
fd
=
open
(
fname
,
O_RDWR
|
O_CREAT
,
0755
);
if
(
fd
<
0
)
return
-
1
;
if
(
tsdbWriteMetaHeader
(
fd
)
<
0
)
{
close
(
fd
);
return
NULL
;
}
return
fd
;
}
static
int
tsdbCheckMetaFileIntegrety
(
int
fd
)
{
// TODO
return
0
;
}
static
int
tsdbRestoreFromMetaFile
(
char
*
fname
,
SMetaFile
*
mfh
)
{
int
fd
=
open
(
fname
,
O_RDWR
);
if
(
fd
<
0
)
return
-
1
;
if
(
tsdbCheckMetaFileIntegrety
(
fd
)
<
0
)
{
// TODO: decide if to auto-recover the file
close
(
fd
);
return
-
1
;
}
if
(
fseek
(
fd
,
TSDB_META_FILE_HEADER_SIZE
,
SEEK_SET
)
<
0
)
{
// TODO: deal with the error
close
(
fd
);
return
-
1
;
}
mfh
->
fd
=
fd
;
// TODO: iterate to read the meta file to restore the meta data
return
0
;
}
\ No newline at end of file
src/vnode/tsdb/tests/tsdbTests.cpp
浏览文件 @
147239d5
...
...
@@ -3,92 +3,63 @@
#include "tsdb.h"
#include "dataformat.h"
#include "tsdbMeta.h"
TEST
(
TsdbTest
,
createTable
)
{
STsdbMeta
*
pMeta
=
tsdbCreateMeta
(
100
);
ASSERT_NE
(
pMeta
,
nullptr
);
STableCfg
config
;
config
.
tableId
.
tid
=
0
;
config
.
tableId
.
uid
=
98868728187539L
;
config
.
numOfCols
=
5
;
config
.
schema
=
tdNewSchema
(
config
.
numOfCols
);
for
(
int
i
=
0
;
i
<
schemaNCols
(
config
.
schema
);
i
++
)
{
STColumn
*
pCol
=
tdNewCol
(
TSDB_DATA_TYPE_BIGINT
,
i
,
0
);
tdColCpy
(
schemaColAt
(
config
.
schema
,
i
),
pCol
);
tdFreeCol
(
pCol
);
}
config
.
tagValues
=
nullptr
;
tsdbCreateTableImpl
(
pMeta
,
&
config
);
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
config
.
tableId
.
uid
);
ASSERT_NE
(
pTable
,
nullptr
);
}
TEST
(
TsdbTest
,
createRepo
)
{
STsdbCfg
*
pCfg
=
tsdbCreateDefaultCfg
()
;
STsdbCfg
config
;
// Create a tsdb repository
tsdb_repo_t
*
pRepo
=
tsdbCreateRepo
(
"/root/mnt/test/vnode0"
,
pCfg
,
NULL
);
// 1. Create a tsdb repository
tsdbSetDefaultCfg
(
&
config
);
tsdb_repo_t
*
pRepo
=
tsdbCreateRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
&
config
,
NULL
);
ASSERT_NE
(
pRepo
,
nullptr
);
tsdbFreeCfg
(
pCfg
);
// create a normal table in this repository
STableCfg
config
;
config
.
tableId
.
tid
=
0
;
config
.
tableId
.
uid
=
98868728187539L
;
config
.
numOfCols
=
5
;
config
.
schema
=
tdNewSchema
(
config
.
numOfCols
);
STColumn
*
pCol
=
tdNewCol
(
TSDB_DATA_TYPE_TIMESTAMP
,
0
,
0
);
tdColCpy
(
schemaColAt
(
config
.
schema
,
0
),
pCol
);
tdFreeCol
(
pCol
);
for
(
int
i
=
1
;
i
<
schemaNCols
(
config
.
schema
);
i
++
)
{
pCol
=
tdNewCol
(
TSDB_DATA_TYPE_BIGINT
,
i
,
0
);
tdColCpy
(
schemaColAt
(
config
.
schema
,
i
),
pCol
);
tdFreeCol
(
pCol
);
}
// 2. Create a normal table
STableCfg
tCfg
;
ASSERT_EQ
(
tsdbInitTableCfg
(
&
tCfg
,
TSDB_SUPER_TABLE
,
987607499877672L
,
0
),
-
1
);
ASSERT_EQ
(
tsdbInitTableCfg
(
&
tCfg
,
TSDB_NTABLE
,
987607499877672L
,
0
),
0
);
tsdbCreateTable
(
pRepo
,
&
config
)
;
// Write some data
int
nCols
=
5
;
STSchema
*
schema
=
tdNewSchema
(
nCols
);
// int32_t size = sizeof(SSubmitMsg) + sizeof(SSubmitBlock) + tdMaxRowDataBytes(config.schema) * 10 + sizeof(int32_t);
for
(
int
i
=
0
;
i
<
nCols
;
i
++
)
{
if
(
i
==
0
)
{
tdSchemaAppendCol
(
schema
,
TSDB_DATA_TYPE_TIMESTAMP
,
i
,
-
1
);
}
else
{
tdSchemaAppendCol
(
schema
,
TSDB_DATA_TYPE_INT
,
i
,
-
1
);
}
}
// tdUpdateSchema(config.schema
);
tsdbTableSetSchema
(
&
tCfg
,
schema
,
true
);
// SSubmitMsg *pMsg = (SSubmitMsg *)malloc(size);
// pMsg->numOfTables = 1; // TODO: use api
tsdbCreateTable
(
pRepo
,
&
tCfg
);
// SSubmitBlock *pBlock = (SSubmitBlock *)pMsg->data;
// pBlock->tableId = {.uid = 98868728187539L, .tid = 0};
// pBlock->sversion = 0;
// pBlock->len = sizeof(SSubmitBlock);
// // 3. Loop to write some simple data
int
nRows
=
10
;
SSubmitMsg
*
pMsg
=
(
SSubmitMsg
*
)
malloc
(
sizeof
(
SSubmitMsg
)
+
sizeof
(
SSubmitBlk
)
+
tdMaxRowBytesFromSchema
(
schema
)
*
nRows
);
// SDataRows rows = pBlock->data;
// dataRowsInit(rows);
SSubmitBlk
*
pBlock
=
pMsg
->
blocks
;
pBlock
->
tableId
=
{.
uid
=
987607499877672L
,
.
tid
=
0
};
pBlock
->
sversion
=
0
;
pBlock
->
len
=
0
;
int64_t
start_time
=
1584081000000
;
for
(
int
i
=
0
;
i
<
nRows
;
i
++
)
{
int64_t
ttime
=
start_time
+
1000
*
i
;
SDataRow
row
=
(
SDataRow
)(
pBlock
->
data
+
pBlock
->
len
);
tdInitDataRow
(
row
,
schema
);
// SDataRow row = tdNewDataRow(tdMaxRowDataBytes(config.schema));
// int64_t ttime = 1583508800000;
// for (int i = 0; i < 10; i++) { // loop over rows
// ttime += (10000 * i);
// tdDataRowReset(row);
// for (int j = 0; j < schemaNCols(config.schema); j++) {
// if (j == 0) { // set time stamp
// tdAppendColVal(row, (void *)(&ttime), schemaColAt(config.schema, j), 40);
// } else { // set other fields
// int32_t val = 10;
// tdAppendColVal(row, (void *)(&val), schemaColAt(config.schema, j), 40);
// }
// }
for
(
int
j
=
0
;
j
<
schemaNCols
(
schema
);
j
++
)
{
if
(
j
==
0
)
{
// Just for timestamp
tdAppendColVal
(
row
,
(
void
*
)(
&
time
),
schemaColAt
(
schema
,
j
));
}
else
{
// For int
int
val
=
10
;
tdAppendColVal
(
row
,
(
void
*
)(
&
val
),
schemaColAt
(
schema
,
j
));
}
// tdDataRowsAppendRow(rows, row);
// }
}
pBlock
->
len
+=
dataRowLen
(
row
);
// tsdbInsertData(pRepo, pMsg);
}
pMsg
->
length
=
pMsg
->
length
+
sizeof
(
SSubmitBlk
)
+
pBlock
->
len
;
// tdFreeDataRow(row);
tsdbInsertData
(
pRepo
,
pMsg
);
}
tdFreeSchema
(
config
.
schema
);
tsdbDropRepo
(
pRepo
);
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录