Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0cbc8e6e
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0cbc8e6e
编写于
5月 05, 2020
作者:
H
hzcheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add retention policy
上级
7dcbe810
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
77 addition
and
78 deletion
+77
-78
src/inc/taoserror.h
src/inc/taoserror.h
+1
-0
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+5
-2
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+28
-41
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+25
-8
src/util/inc/ttime.h
src/util/inc/ttime.h
+18
-3
src/util/src/ttime.c
src/util/src/ttime.c
+0
-24
未找到文件。
src/inc/taoserror.h
浏览文件 @
0cbc8e6e
...
...
@@ -115,6 +115,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_ID, 0, 255, "invalid query i
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_STREAM_ID
,
0
,
256
,
"invalid stream id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_CONNECTION
,
0
,
257
,
"invalid connection"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SDB_ERROR
,
0
,
258
,
"sdb error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TIMESTAMP_OUT_OF_RANGE
,
0
,
259
,
"timestamp is out of range"
)
// acct
TAOS_DEFINE_ERROR
(
TSDB_CODE_ACCT_ALREADY_EXIST
,
0
,
300
,
"accounts already exist"
)
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
0cbc8e6e
...
...
@@ -227,13 +227,13 @@ typedef struct {
int
maxFGroups
;
int
numOfFGroups
;
SFileGroup
fGroup
[]
;
SFileGroup
*
fGroup
;
}
STsdbFileH
;
#define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId
#define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
);
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
STsdbCfg
*
pCfg
);
void
tsdbCloseFileH
(
STsdbFileH
*
pFileH
);
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
const
char
*
suffix
,
int
maxTables
,
SFile
*
pFile
,
int
writeHeader
,
int
toClose
);
...
...
@@ -485,6 +485,9 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper);
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
);
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
);
// --------- Other functions need to further organize
void
tsdbFitRetention
(
STsdbRepo
*
pRepo
);
#ifdef __cplusplus
}
#endif
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
0cbc8e6e
...
...
@@ -27,6 +27,7 @@
#include "tchecksum.h"
#include "tsdbMain.h"
#include "tutil.h"
#include "ttime.h"
const
char
*
tsdbFileSuffix
[]
=
{
".head"
,
// TSDB_FILE_TYPE_HEAD
...
...
@@ -40,13 +41,19 @@ static int tsdbWriteFileHead(SFile *pFile);
static
int
tsdbWriteHeadFileIdx
(
SFile
*
pFile
,
int
maxTables
);
static
int
tsdbOpenFGroup
(
STsdbFileH
*
pFileH
,
char
*
dataDir
,
int
fid
);
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
)
{
STsdbFileH
*
pFileH
=
(
STsdbFileH
*
)
calloc
(
1
,
sizeof
(
STsdbFileH
)
+
sizeof
(
SFileGroup
)
*
maxFiles
);
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
STsdbCfg
*
pCfg
)
{
STsdbFileH
*
pFileH
=
(
STsdbFileH
*
)
calloc
(
1
,
sizeof
(
STsdbFileH
));
if
(
pFileH
==
NULL
)
{
// TODO: deal with ERROR here
return
NULL
;
}
pFileH
->
maxFGroups
=
maxFiles
;
pFileH
->
maxFGroups
=
pCfg
->
keep
/
pCfg
->
daysPerFile
+
2
;
pFileH
->
fGroup
=
(
SFileGroup
*
)
calloc
(
pFileH
->
maxFGroups
,
sizeof
(
SFileGroup
));
if
(
pFileH
->
fGroup
==
NULL
)
{
free
(
pFileH
);
return
NULL
;
}
DIR
*
dir
=
opendir
(
dataDir
);
if
(
dir
==
NULL
)
{
...
...
@@ -69,7 +76,12 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
return
pFileH
;
}
void
tsdbCloseFileH
(
STsdbFileH
*
pFileH
)
{
free
(
pFileH
);
}
void
tsdbCloseFileH
(
STsdbFileH
*
pFileH
)
{
if
(
pFileH
)
{
tfree
(
pFileH
->
fGroup
);
free
(
pFileH
);
}
}
static
int
tsdbInitFile
(
char
*
dataDir
,
int
fid
,
const
char
*
suffix
,
SFile
*
pFile
)
{
tsdbGetFileName
(
dataDir
,
fid
,
suffix
,
pFile
->
fname
);
...
...
@@ -161,6 +173,18 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct
}
}
void
tsdbFitRetention
(
STsdbRepo
*
pRepo
)
{
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pGroup
=
pFileH
->
fGroup
;
int
mfid
=
tsdbGetKeyFileId
(
taosGetTimestamp
(
pRepo
->
config
.
precision
),
pRepo
->
config
.
daysPerFile
,
pRepo
->
config
.
precision
);
while
(
pGroup
[
0
].
fileId
<
mfid
)
{
tsdbRemoveFileGroup
(
pFileH
,
pGroup
[
0
].
fileId
);
}
}
void
tsdbSeekFileGroupIter
(
SFileGroupIter
*
pIter
,
int
fid
)
{
if
(
pIter
->
numOfFGroups
==
0
)
{
assert
(
pIter
->
pFileGroup
==
NULL
);
...
...
@@ -252,43 +276,6 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf
return
0
;
}
// int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
// if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
// if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1;
// // TODO: need to check the correctness
// return 0;
// }
// int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
// if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1;
// if (read(pFile->fd, buf, pIdx->len) < 0) return -1;
// // TODO: need to check the correctness
// return 0;
// }
// int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) {
// // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
// if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1;
// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
// if (read(pFile->fd, buf, size) < 0) return -1;
// return 0;
// }
// int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) {
// if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1;
// if (read(pFile->fd, buf, pCol->len) < 0) return -1;
// return 0;
// }
static
int
compFGroupKey
(
const
void
*
key
,
const
void
*
fgroup
)
{
int
fid
=
*
(
int
*
)
key
;
SFileGroup
*
pFGroup
=
(
SFileGroup
*
)
fgroup
;
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
0cbc8e6e
...
...
@@ -6,6 +6,7 @@
#include "tsdbMain.h"
#include "tscompression.h"
#include "tchecksum.h"
#include "ttime.h"
int
tsdbDebugFlag
=
135
;
...
...
@@ -27,7 +28,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static
int32_t
tsdbSetRepoEnv
(
STsdbRepo
*
pRepo
);
static
int32_t
tsdbDestroyRepoEnv
(
STsdbRepo
*
pRepo
);
// static int tsdbOpenMetaFile(char *tsdbDir);
static
int32_t
tsdbInsertDataToTable
(
TsdbRepoT
*
repo
,
SSubmitBlk
*
pBlock
);
static
int32_t
tsdbInsertDataToTable
(
TsdbRepoT
*
repo
,
SSubmitBlk
*
pBlock
,
TSKEY
now
);
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
);
static
void
*
tsdbCommitData
(
void
*
arg
);
...
...
@@ -214,7 +215,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
}
tsdbGetDataDirName
(
pRepo
,
dataDir
);
pRepo
->
tsdbFileH
=
tsdbInitFileH
(
dataDir
,
pRepo
->
config
.
maxTables
);
pRepo
->
tsdbFileH
=
tsdbInitFileH
(
dataDir
,
&
(
pRepo
->
config
)
);
if
(
pRepo
->
tsdbFileH
==
NULL
)
{
tsdbFreeCache
(
pRepo
->
tsdbCache
);
tsdbFreeMeta
(
pRepo
->
tsdbMeta
);
...
...
@@ -394,13 +395,16 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) {
// TODO: need to return the number of data inserted
int32_t
tsdbInsertData
(
TsdbRepoT
*
repo
,
SSubmitMsg
*
pMsg
)
{
SSubmitMsgIter
msgIter
;
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
tsdbInitSubmitMsgIter
(
pMsg
,
&
msgIter
);
SSubmitBlk
*
pBlock
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
TSKEY
now
=
taosGetTimestamp
(
pRepo
->
config
.
precision
);
while
((
pBlock
=
tsdbGetSubmitMsgNext
(
&
msgIter
))
!=
NULL
)
{
if
((
code
=
tsdbInsertDataToTable
(
repo
,
pBlock
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
tsdbInsertDataToTable
(
repo
,
pBlock
,
now
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
...
...
@@ -787,21 +791,31 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
return
0
;
}
static
int32_t
tsdbInsertDataToTable
(
TsdbRepoT
*
repo
,
SSubmitBlk
*
pBlock
)
{
static
int32_t
tsdbInsertDataToTable
(
TsdbRepoT
*
repo
,
SSubmitBlk
*
pBlock
,
TSKEY
now
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STableId
tableId
=
{.
uid
=
pBlock
->
uid
,
.
tid
=
pBlock
->
tid
};
STable
*
pTable
=
tsdbIsValidTableToInsert
(
pRepo
->
tsdbMeta
,
tableId
);
if
(
pTable
==
NULL
)
{
u
Error
(
"failed to get table for insert, uid:%"
PRIu64
", tid:%d"
,
tableId
.
uid
,
tableId
.
tid
);
tsdb
Error
(
"failed to get table for insert, uid:%"
PRIu64
", tid:%d"
,
tableId
.
uid
,
tableId
.
tid
);
return
TSDB_CODE_INVALID_TABLE_ID
;
}
SSubmitBlkIter
blkIter
;
SDataRow
row
;
SSubmitBlkIter
blkIter
=
{
0
};
SDataRow
row
=
NULL
;
TSKEY
minKey
=
now
-
tsMsPerDay
[
pRepo
->
config
.
precision
]
*
pRepo
->
config
.
keep
;
TSKEY
maxKey
=
now
+
tsMsPerDay
[
pRepo
->
config
.
precision
]
*
pRepo
->
config
.
daysPerFile
;
tsdbInitSubmitBlkIter
(
pBlock
,
&
blkIter
);
while
((
row
=
tsdbGetSubmitBlkNext
(
&
blkIter
))
!=
NULL
)
{
if
(
dataRowKey
(
row
)
<
minKey
||
dataRowKey
(
row
)
>
maxKey
)
{
tsdbError
(
"tsdbId: %d, table tid: %d, talbe uid: %ld timestamp is out of range. now: %ld maxKey: %ld, minKey: %ld"
,
pRepo
->
config
.
tsdbId
,
pTable
->
tableId
.
tid
,
pTable
->
tableId
.
uid
,
now
,
minKey
,
maxKey
);
return
TSDB_CODE_TIMESTAMP_OUT_OF_RANGE
;
}
if
(
tdInsertRowToTable
(
pRepo
,
row
,
pTable
)
<
0
)
{
return
-
1
;
}
...
...
@@ -903,6 +917,9 @@ static void *tsdbCommitData(void *arg) {
}
}
// Do retention actions
tsdbFitRetention
(
pRepo
);
_exit:
tdFreeDataCols
(
pDataCols
);
tsdbDestroyTableIters
(
iters
,
pCfg
->
maxTables
);
...
...
src/util/inc/ttime.h
浏览文件 @
0cbc8e6e
...
...
@@ -22,22 +22,37 @@ extern "C" {
#include <stdint.h>
#include <time.h>
#include "tutil.h"
//@return timestamp in second
int32_t
taosGetTimestampSec
();
//@return timestamp in millisecond
int64_t
taosGetTimestampMs
();
static
FORCE_INLINE
int64_t
taosGetTimestampMs
()
{
struct
timeval
systemTime
;
gettimeofday
(
&
systemTime
,
NULL
);
return
(
int64_t
)
systemTime
.
tv_sec
*
1000L
+
(
uint64_t
)
systemTime
.
tv_usec
/
1000
;
}
//@return timestamp in microsecond
int64_t
taosGetTimestampUs
();
static
FORCE_INLINE
int64_t
taosGetTimestampUs
()
{
struct
timeval
systemTime
;
gettimeofday
(
&
systemTime
,
NULL
);
return
(
int64_t
)
systemTime
.
tv_sec
*
1000000L
+
(
uint64_t
)
systemTime
.
tv_usec
;
}
/*
* @return timestamp decided by global conf variable, tsTimePrecision
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
* precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond.
*/
int64_t
taosGetTimestamp
(
int32_t
precision
);
static
FORCE_INLINE
int64_t
taosGetTimestamp
(
int32_t
precision
)
{
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
return
taosGetTimestampUs
();
}
else
{
return
taosGetTimestampMs
();
}
}
int32_t
getTimestampInUsFromStr
(
char
*
token
,
int32_t
tokenlen
,
int64_t
*
ts
);
...
...
src/util/src/ttime.c
浏览文件 @
0cbc8e6e
...
...
@@ -121,30 +121,6 @@ static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
int32_t
taosGetTimestampSec
()
{
return
(
int32_t
)
time
(
NULL
);
}
int64_t
taosGetTimestampMs
()
{
struct
timeval
systemTime
;
gettimeofday
(
&
systemTime
,
NULL
);
return
(
int64_t
)
systemTime
.
tv_sec
*
1000L
+
(
uint64_t
)
systemTime
.
tv_usec
/
1000
;
}
int64_t
taosGetTimestampUs
()
{
struct
timeval
systemTime
;
gettimeofday
(
&
systemTime
,
NULL
);
return
(
int64_t
)
systemTime
.
tv_sec
*
1000000L
+
(
uint64_t
)
systemTime
.
tv_usec
;
}
/*
* If tsTimePrecision == 1, taosGetTimestamp will return timestamp in microsecond.
* Otherwise, it will return timestamp in millisecond.
*/
int64_t
taosGetTimestamp
(
int32_t
precision
)
{
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
return
taosGetTimestampUs
();
}
else
{
return
taosGetTimestampMs
();
}
}
int32_t
taosParseTime
(
char
*
timestr
,
int64_t
*
time
,
int32_t
len
,
int32_t
timePrec
)
{
/* parse datatime string in with tz */
if
(
strnchr
(
timestr
,
'T'
,
len
,
false
)
!=
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录