Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
8517faae
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8517faae
编写于
10月 22, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
finish more code
上级
d6d1532b
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
204 addition
and
78 deletion
+204
-78
src/common/inc/tdisk.h
src/common/inc/tdisk.h
+20
-12
src/common/src/tdisk.c
src/common/src/tdisk.c
+49
-37
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+4
-4
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+14
-4
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+111
-16
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+6
-5
未找到文件。
src/common/inc/tdisk.h
浏览文件 @
8517faae
...
...
@@ -38,6 +38,13 @@ typedef struct {
}
SDiskMeta
;
typedef
struct
{
uint64_t
tsize
;
uint64_t
avail
;
// bytes
}
STiersMeta
;
typedef
struct
{
int
level
;
int
did
;
char
dir
[
TSDB_FILENAME_LEN
];
SDiskMeta
dmeta
;
}
SDisk
;
...
...
@@ -50,6 +57,7 @@ typedef struct {
typedef
struct
SDnodeTier
{
pthread_mutex_t
lock
;
STiersMeta
meta
;
int
nTiers
;
STier
tiers
[
TSDB_MAX_TIERS
];
SHashObj
*
map
;
...
...
@@ -58,7 +66,7 @@ typedef struct SDnodeTier {
extern
struct
SDnodeTier
*
tsDnodeTier
;
#define DNODE_PRIMARY_DISK(pDnodeTier) (pDnodeTier)->tiers[0].disks[0]
static
FORCE_INLINE
int
dnode
LockTiers
(
SDnodeTier
*
pDnodeTier
)
{
static
FORCE_INLINE
int
td
LockTiers
(
SDnodeTier
*
pDnodeTier
)
{
int
code
=
pthread_mutex_lock
(
&
(
pDnodeTier
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
...
...
@@ -67,7 +75,7 @@ static FORCE_INLINE int dnodeLockTiers(SDnodeTier *pDnodeTier) {
return
0
;
}
static
FORCE_INLINE
int
dnode
UnLockTiers
(
SDnodeTier
*
pDnodeTier
)
{
static
FORCE_INLINE
int
td
UnLockTiers
(
SDnodeTier
*
pDnodeTier
)
{
int
code
=
pthread_mutex_unlock
(
&
(
pDnodeTier
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
...
...
@@ -76,7 +84,7 @@ static FORCE_INLINE int dnodeUnLockTiers(SDnodeTier *pDnodeTier) {
return
0
;
}
static
FORCE_INLINE
SDisk
*
dnode
GetDisk
(
SDnodeTier
*
pDnodeTier
,
int
level
,
int
did
)
{
static
FORCE_INLINE
SDisk
*
td
GetDisk
(
SDnodeTier
*
pDnodeTier
,
int
level
,
int
did
)
{
if
(
level
<
0
||
level
>=
pDnodeTier
->
nTiers
)
return
NULL
;
if
(
did
<
0
||
did
>=
pDnodeTier
->
tiers
[
level
].
nDisks
)
return
NULL
;
...
...
@@ -84,15 +92,15 @@ static FORCE_INLINE SDisk *dnodeGetDisk(SDnodeTier *pDnodeTier, int level, int d
return
pDnodeTier
->
tiers
[
level
].
disks
[
did
];
}
SDnodeTier
*
dnode
NewTier
();
void
*
dnode
CloseTier
(
SDnodeTier
*
pDnodeTier
);
int
dnode
AddDisks
(
SDnodeTier
*
pDnodeTier
,
SDiskCfg
*
pDiskCfgs
,
int
ndisks
);
int
dnode
UpdateTiersInfo
(
SDnodeTier
*
pDnodeTier
);
int
dnode
CheckTiers
(
SDnodeTier
*
pDnodeTier
);
SDisk
*
dnode
AssignDisk
(
SDnodeTier
*
pDnodeTier
,
int
level
);
SDisk
*
dnode
GetDiskByName
(
SDnodeTier
*
pDnodeTier
,
char
*
dirName
);
void
dnode
IncDiskFiles
(
SDnodeTier
*
pDnodeTier
,
SDisk
*
pDisk
,
bool
lock
);
void
dnode
DecDiskFiles
(
SDnodeTier
*
pDnodeTier
,
SDisk
*
pDisk
,
bool
lock
);
SDnodeTier
*
td
NewTier
();
void
*
td
CloseTier
(
SDnodeTier
*
pDnodeTier
);
int
td
AddDisks
(
SDnodeTier
*
pDnodeTier
,
SDiskCfg
*
pDiskCfgs
,
int
ndisks
);
int
td
UpdateTiersInfo
(
SDnodeTier
*
pDnodeTier
);
int
td
CheckTiers
(
SDnodeTier
*
pDnodeTier
);
SDisk
*
td
AssignDisk
(
SDnodeTier
*
pDnodeTier
,
int
level
);
SDisk
*
td
GetDiskByName
(
SDnodeTier
*
pDnodeTier
,
char
*
dirName
);
void
td
IncDiskFiles
(
SDnodeTier
*
pDnodeTier
,
SDisk
*
pDisk
,
bool
lock
);
void
td
DecDiskFiles
(
SDnodeTier
*
pDnodeTier
,
SDisk
*
pDisk
,
bool
lock
);
#ifdef __cplusplus
}
...
...
src/common/src/tdisk.c
浏览文件 @
8517faae
...
...
@@ -20,14 +20,14 @@
#define DISK_MIN_FREE_SPACE 30 * 1024 * 1024 // disk free space less than 100M will not create new file again
#define DNODE_DISK_AVAIL(pDisk) ((pDisk)->dmeta.free > DISK_MIN_FREE_SPACE)
static
int
dnode
FormatDir
(
char
*
idir
,
char
*
odir
);
static
int
dnode
CheckDisk
(
char
*
dirName
,
int
level
,
int
primary
);
static
int
dnode
UpdateDiskMeta
(
SDisk
*
pDisk
);
static
int
dnode
AddDisk
(
SDnodeTier
*
pDnodeTier
,
char
*
dir
,
int
level
,
int
primary
);
static
int
td
FormatDir
(
char
*
idir
,
char
*
odir
);
static
int
td
CheckDisk
(
char
*
dirName
,
int
level
,
int
primary
);
static
int
td
UpdateDiskMeta
(
SDisk
*
pDisk
);
static
int
td
AddDisk
(
SDnodeTier
*
pDnodeTier
,
char
*
dir
,
int
level
,
int
primary
);
struct
SDnodeTier
*
tsDnodeTier
=
NULL
;
SDnodeTier
*
dnode
NewTier
()
{
SDnodeTier
*
td
NewTier
()
{
SDnodeTier
*
pDnodeTier
=
(
SDnodeTier
*
)
calloc
(
1
,
sizeof
(
*
pDnodeTier
));
if
(
pDnodeTier
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -37,7 +37,7 @@ SDnodeTier *dnodeNewTier() {
int
ret
=
pthread_mutex_init
(
&
(
pDnodeTier
->
lock
),
NULL
);
if
(
ret
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
ret
);
dnode
CloseTier
(
pDnodeTier
);
td
CloseTier
(
pDnodeTier
);
return
NULL
;
}
...
...
@@ -45,14 +45,14 @@ SDnodeTier *dnodeNewTier() {
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
pDnodeTier
->
map
==
NULL
)
{
terrno
=
TSDB_CODE_COM_OUT_OF_MEMORY
;
dnode
CloseTier
(
pDnodeTier
);
td
CloseTier
(
pDnodeTier
);
return
NULL
;
}
return
pDnodeTier
;
}
void
*
dnode
CloseTier
(
SDnodeTier
*
pDnodeTier
)
{
void
*
td
CloseTier
(
SDnodeTier
*
pDnodeTier
)
{
if
(
pDnodeTier
)
{
if
(
pDnodeTier
->
map
)
{
taosHashCleanup
(
pDnodeTier
->
map
);
...
...
@@ -75,32 +75,42 @@ void *dnodeCloseTier(SDnodeTier *pDnodeTier) {
return
NULL
;
}
int
dnode
AddDisks
(
SDnodeTier
*
pDnodeTier
,
SDiskCfg
*
pDiskCfgs
,
int
ndisks
)
{
int
td
AddDisks
(
SDnodeTier
*
pDnodeTier
,
SDiskCfg
*
pDiskCfgs
,
int
ndisks
)
{
ASSERT
(
ndisks
>
0
);
for
(
int
i
=
0
;
i
<
ndisks
;
i
++
)
{
SDiskCfg
*
pCfg
=
pDiskCfgs
+
i
;
dnode
AddDisk
(
pDnodeTier
,
pCfg
->
dir
,
pCfg
->
level
,
pCfg
->
primary
);
td
AddDisk
(
pDnodeTier
,
pCfg
->
dir
,
pCfg
->
level
,
pCfg
->
primary
);
}
if
(
dnode
CheckTiers
(
pDnodeTier
)
<
0
)
return
-
1
;
if
(
td
CheckTiers
(
pDnodeTier
)
<
0
)
return
-
1
;
return
0
;
}
int
dnodeUpdateTiersInfo
(
SDnodeTier
*
pDnodeTier
)
{
int
tdUpdateTiersInfo
(
SDnodeTier
*
pDnodeTier
)
{
tdLockTiers
(
pDnodeTier
);
pDnodeTier
->
meta
.
tsize
=
0
;
pDnodeTier
->
meta
.
avail
=
0
;
for
(
int
i
=
0
;
i
<
pDnodeTier
->
nTiers
;
i
++
)
{
STier
*
pTier
=
pDnodeTier
->
tiers
+
i
;
for
(
int
j
=
0
;
j
<
pTier
->
nDisks
;
j
++
)
{
SDisk
*
pDisk
=
pTier
->
disks
[
j
];
if
(
dnodeUpdateDiskMeta
(
pDisk
)
<
0
)
return
-
1
;
if
(
tdUpdateDiskMeta
(
pDisk
)
<
0
)
return
-
1
;
pDnodeTier
->
meta
.
tsize
+=
pDisk
->
dmeta
.
size
;
pDnodeTier
->
meta
.
avail
+=
pDisk
->
dmeta
.
free
;
}
}
tdUnLockTiers
(
pDnodeTier
);
return
0
;
}
int
dnode
CheckTiers
(
SDnodeTier
*
pDnodeTier
)
{
int
td
CheckTiers
(
SDnodeTier
*
pDnodeTier
)
{
ASSERT
(
pDnodeTier
->
nTiers
>
0
);
if
(
DNODE_PRIMARY_DISK
(
pDnodeTier
)
==
NULL
)
{
terrno
=
TSDB_CODE_DND_LACK_PRIMARY_DISK
;
...
...
@@ -117,7 +127,7 @@ int dnodeCheckTiers(SDnodeTier *pDnodeTier) {
return
0
;
}
SDisk
*
dnode
AssignDisk
(
SDnodeTier
*
pDnodeTier
,
int
level
)
{
SDisk
*
td
AssignDisk
(
SDnodeTier
*
pDnodeTier
,
int
level
)
{
ASSERT
(
level
<
pDnodeTier
->
nTiers
);
STier
*
pTier
=
pDnodeTier
->
tiers
+
level
;
...
...
@@ -125,11 +135,11 @@ SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) {
ASSERT
(
pTier
->
nDisks
>
0
);
dnode
LockTiers
(
pDnodeTier
);
td
LockTiers
(
pDnodeTier
);
for
(
int
i
=
0
;
i
<
pTier
->
nDisks
;
i
++
)
{
SDisk
*
iDisk
=
pTier
->
disks
[
i
];
if
(
dnode
UpdateDiskMeta
(
iDisk
)
<
0
)
return
NULL
;
if
(
td
UpdateDiskMeta
(
iDisk
)
<
0
)
return
NULL
;
if
(
DNODE_DISK_AVAIL
(
iDisk
))
{
if
(
pDisk
==
NULL
||
pDisk
->
dmeta
.
nfiles
>
iDisk
->
dmeta
.
nfiles
)
{
pDisk
=
iDisk
;
...
...
@@ -139,22 +149,22 @@ SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) {
if
(
pDisk
==
NULL
)
{
terrno
=
TSDB_CODE_DND_NO_DISK_SPACE
;
dnode
UnLockTiers
(
pDnodeTier
);
td
UnLockTiers
(
pDnodeTier
);
return
NULL
;
}
dnode
IncDiskFiles
(
pDnodeTier
,
pDisk
,
false
);
td
IncDiskFiles
(
pDnodeTier
,
pDisk
,
false
);
dnode
UnLockTiers
(
pDnodeTier
);
td
UnLockTiers
(
pDnodeTier
);
return
NULL
;
}
SDisk
*
dnode
GetDiskByName
(
SDnodeTier
*
pDnodeTier
,
char
*
dirName
)
{
SDisk
*
td
GetDiskByName
(
SDnodeTier
*
pDnodeTier
,
char
*
dirName
)
{
char
fdirName
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
SDiskID
*
pDiskID
=
NULL
;
if
(
dnode
FormatDir
(
dirName
,
fdirName
)
<
0
)
{
if
(
td
FormatDir
(
dirName
,
fdirName
)
<
0
)
{
return
NULL
;
}
...
...
@@ -162,34 +172,34 @@ SDisk *dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) {
if
(
ptr
==
NULL
)
return
NULL
;
pDiskID
=
(
SDiskID
*
)
ptr
;
return
dnode
GetDisk
(
pDnodeTier
,
pDiskID
->
level
,
pDiskID
->
did
);
return
td
GetDisk
(
pDnodeTier
,
pDiskID
->
level
,
pDiskID
->
did
);
}
void
dnode
IncDiskFiles
(
SDnodeTier
*
pDnodeTier
,
SDisk
*
pDisk
,
bool
lock
)
{
void
td
IncDiskFiles
(
SDnodeTier
*
pDnodeTier
,
SDisk
*
pDisk
,
bool
lock
)
{
if
(
lock
)
{
dnode
LockTiers
(
pDnodeTier
);
td
LockTiers
(
pDnodeTier
);
}
pDisk
->
dmeta
.
nfiles
++
;
if
(
lock
)
{
dnode
UnLockTiers
(
pDnodeTier
);
td
UnLockTiers
(
pDnodeTier
);
}
}
void
dnode
DecDiskFiles
(
SDnodeTier
*
pDnodeTier
,
SDisk
*
pDisk
,
bool
lock
)
{
void
td
DecDiskFiles
(
SDnodeTier
*
pDnodeTier
,
SDisk
*
pDisk
,
bool
lock
)
{
if
(
lock
)
{
dnode
LockTiers
(
pDnodeTier
);
td
LockTiers
(
pDnodeTier
);
}
pDisk
->
dmeta
.
nfiles
--
;
if
(
lock
)
{
dnode
UnLockTiers
(
pDnodeTier
);
td
UnLockTiers
(
pDnodeTier
);
}
}
static
int
dnode
FormatDir
(
char
*
idir
,
char
*
odir
)
{
static
int
td
FormatDir
(
char
*
idir
,
char
*
odir
)
{
wordexp_t
wep
;
int
code
=
wordexp
(
idir
,
&
wep
,
0
);
...
...
@@ -210,7 +220,7 @@ static int dnodeFormatDir(char *idir, char *odir) {
return
0
;
}
static
int
dnode
CheckDisk
(
char
*
dirName
,
int
level
,
int
primary
)
{
static
int
td
CheckDisk
(
char
*
dirName
,
int
level
,
int
primary
)
{
if
(
access
(
dirName
,
W_OK
|
R_OK
|
F_OK
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -230,7 +240,7 @@ static int dnodeCheckDisk(char *dirName, int level, int primary) {
}
}
static
int
dnode
UpdateDiskMeta
(
SDisk
*
pDisk
)
{
static
int
td
UpdateDiskMeta
(
SDisk
*
pDisk
)
{
struct
statvfs
dstat
;
if
(
statvfs
(
pDisk
->
dir
,
&
dstat
)
<
0
)
{
uError
(
"failed to get dir %s information since %s"
,
pDisk
->
dir
,
strerror
(
errno
));
...
...
@@ -244,7 +254,7 @@ static int dnodeUpdateDiskMeta(SDisk *pDisk) {
return
0
;
}
static
int
dnode
AddDisk
(
SDnodeTier
*
pDnodeTier
,
char
*
dir
,
int
level
,
int
primary
)
{
static
int
td
AddDisk
(
SDnodeTier
*
pDnodeTier
,
char
*
dir
,
int
level
,
int
primary
)
{
char
dirName
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
STier
*
pTier
=
NULL
;
SDiskID
diskid
=
{
0
};
...
...
@@ -256,7 +266,7 @@ static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primar
return
-
1
;
}
if
(
dnode
FormatDir
(
dir
,
dirName
)
<
0
)
{
if
(
td
FormatDir
(
dir
,
dirName
)
<
0
)
{
uError
(
"failed to add disk %s to tier %d level since %s"
,
dir
,
level
,
tstrerror
(
terrno
));
return
-
1
;
}
...
...
@@ -270,13 +280,13 @@ static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primar
return
-
1
;
}
if
(
dnode
GetDiskByName
(
pDnodeTier
,
dirName
)
!=
NULL
)
{
if
(
td
GetDiskByName
(
pDnodeTier
,
dirName
)
!=
NULL
)
{
terrno
=
TSDB_CODE_DND_DISK_ALREADY_EXISTS
;
uError
(
"failed to add disk %s to tier %d level since %s"
,
dir
,
level
,
tstrerror
(
terrno
));
return
-
1
;
}
if
(
dnode
CheckDisk
(
dirName
,
level
,
primary
)
<
0
)
{
if
(
td
CheckDisk
(
dirName
,
level
,
primary
)
<
0
)
{
uError
(
"failed to add disk %s to tier %d level since %s"
,
dir
,
level
,
tstrerror
(
terrno
));
return
-
1
;
}
...
...
@@ -320,6 +330,8 @@ static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primar
}
strncpy
(
pDisk
->
dir
,
dirName
,
TSDB_FILENAME_LEN
);
pDisk
->
level
=
diskid
.
level
;
pDisk
->
did
=
diskid
.
did
;
if
(
taosHashPut
(
pDnodeTier
->
map
,
(
void
*
)
dirName
,
strnlen
(
dirName
,
TSDB_FILENAME_LEN
),
(
void
*
)(
&
diskid
),
sizeof
(
diskid
))
<
0
)
{
...
...
@@ -331,7 +343,7 @@ static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primar
pTier
->
nDisks
++
;
pTier
->
disks
[
diskid
.
did
]
=
pDisk
;
pDnodeTier
->
nTiers
=
MAX
(
pDnodeTier
->
nTiers
,
level
);
pDnodeTier
->
nTiers
=
MAX
(
pDnodeTier
->
nTiers
,
level
+
1
);
return
0
;
}
\ No newline at end of file
src/dnode/src/dnodeMain.c
浏览文件 @
8517faae
...
...
@@ -170,13 +170,13 @@ static void dnodeCheckDataDirOpenned(char *dir) {
}
static
int32_t
dnodeInitStorage
()
{
tsDnodeTier
=
dnode
NewTier
();
tsDnodeTier
=
td
NewTier
();
if
(
tsDnodeTier
==
NULL
)
{
dError
(
"failed to create new dnode tier since %s"
,
tstrerror
(
terrno
));
return
-
1
;
}
if
(
dnode
AddDisks
(
tsDnodeTier
,
tsDiskCfg
,
tsDiskCfgNum
)
<
0
)
{
if
(
td
AddDisks
(
tsDnodeTier
,
tsDiskCfg
,
tsDiskCfgNum
)
<
0
)
{
dError
(
"failed to add disks to dnode tier since %s"
,
tstrerror
(
terrno
));
return
-
1
;
}
...
...
@@ -201,7 +201,7 @@ static int32_t dnodeInitStorage() {
STier
*
pTier
=
tsDnodeTier
->
tiers
+
i
;
for
(
int
j
=
0
;
j
<
pTier
->
nDisks
;
j
++
)
{
SDisk
*
pDisk
=
dnode
GetDisk
(
tsDnodeTier
,
i
,
j
);
SDisk
*
pDisk
=
td
GetDisk
(
tsDnodeTier
,
i
,
j
);
tdGetVnodeRootDir
(
dirName
,
pDisk
->
dir
);
if
(
dnodeCreateDir
(
dirName
)
<
0
)
{
...
...
@@ -225,7 +225,7 @@ static int32_t dnodeInitStorage() {
static
void
dnodeCleanupStorage
()
{
if
(
tsDnodeTier
)
{
dnode
CloseTier
(
tsDnodeTier
);
td
CloseTier
(
tsDnodeTier
);
tsDnodeTier
=
NULL
;
}
}
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
8517faae
...
...
@@ -153,6 +153,14 @@ typedef struct {
// ------------------ tsdbFile.c
extern
const
char
*
tsdbFileSuffix
[];
// minFid <= midFid <= maxFid
typedef
struct
{
int
minFid
;
// >= minFid && < midFid, at level 2
int
midFid
;
// >= midFid && < maxFid, at level 1
int
maxFid
;
// >= maxFid, at level 0
}
SFidGroup
;
typedef
enum
{
TSDB_FILE_TYPE_HEAD
=
0
,
TSDB_FILE_TYPE_DATA
,
...
...
@@ -189,7 +197,9 @@ typedef struct {
typedef
struct
{
int
fileId
;
int
state
;
// 0 for health, 1 for problem
int
state
;
// 0 for health, 1 for problem
int
level
;
int
did
;
SFile
files
[
TSDB_FILE_TYPE_MAX
];
}
SFileGroup
;
...
...
@@ -483,17 +493,17 @@ int tsdbOpenFile(SFile* pFile, int oflag);
void
tsdbCloseFile
(
SFile
*
pFile
);
int
tsdbCreateFile
(
SFile
*
pFile
,
STsdbRepo
*
pRepo
,
int
fid
,
int
type
,
SDisk
*
pDisk
);
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
,
int
flags
);
void
tsdbRemoveFilesBeyondRetention
(
STsdbRepo
*
pRepo
,
int
mfid
);
void
tsdbRemoveFilesBeyondRetention
(
STsdbRepo
*
pRepo
,
SFidGroup
*
pFidGroup
);
int
tsdbUpdateFileHeader
(
SFile
*
pFile
);
int
tsdbEncodeSFileInfo
(
void
**
buf
,
const
STsdbFileInfo
*
pInfo
);
void
*
tsdbDecodeSFileInfo
(
void
*
buf
,
STsdbFileInfo
*
pInfo
);
void
tsdbRemoveFileGroup
(
STsdbRepo
*
pRepo
,
SFileGroup
*
pFGroup
);
int
tsdbLoadFileHeader
(
SFile
*
pFile
,
uint32_t
*
version
);
void
tsdbGetFileInfoImpl
(
char
*
fname
,
uint32_t
*
magic
,
int64_t
*
size
);
void
tsdbGetFidGroup
(
STsdbCfg
*
pCfg
,
SFidGroup
*
pFidGroup
);
void
tsdbGetFidKeyRange
(
int
daysPerFile
,
int8_t
precision
,
int
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
int
tsdbGetCurrMinFid
(
int8_t
precision
,
int32_t
keep
,
int32_t
days
);
int
tsdbGetBaseDirFromFile
(
char
*
fname
,
char
*
baseDir
);
int
tsdbApplyRetention
(
STsdbRepo
*
pRepo
);
int
tsdbApplyRetention
(
STsdbRepo
*
pRepo
,
SFidGroup
*
pFidGroup
);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
8517faae
...
...
@@ -31,10 +31,11 @@ const char * tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h",
static
void
tsdbDestroyFile
(
SFile
*
pFile
);
static
int
compFGroup
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
keyFGroupCompFunc
(
const
void
*
key
,
const
void
*
fgroup
);
static
TSKEY
tsdbGetCurrMinKey
(
int8_t
precision
,
int32_t
keep
);
static
int
tsdbLoadFilesFromDisk
(
STsdbRepo
*
pRepo
,
SDisk
*
pDisk
);
static
SHashObj
*
tsdbGetAllFids
(
STsdbRepo
*
pRepo
,
char
*
dirName
);
static
int
tsdbRestoreFileGroup
(
STsdbRepo
*
pRepo
,
SDisk
*
pDisk
,
int
fid
,
SFileGroup
*
pFileGroup
);
static
int
tsdbGetFidLevel
(
int
fid
,
SFidGroup
*
pFidGroup
);
static
int
tsdbCreateVnodeDataDir
(
char
*
baseDir
,
int
vid
);
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH
*
tsdbNewFileH
(
STsdbCfg
*
pCfg
)
{
...
...
@@ -115,13 +116,15 @@ SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid) {
ASSERT
(
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
)
==
NULL
);
// TODO: think about if (level == 0) is correct
SDisk
*
pDisk
=
dnode
AssignDisk
(
tsDnodeTier
,
0
);
SDisk
*
pDisk
=
td
AssignDisk
(
tsDnodeTier
,
0
);
if
(
pDisk
==
NULL
)
{
tsdbError
(
"vgId:%d failed to create file group %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
return
NULL
;
}
fGroup
.
fileId
=
fid
;
fGroup
.
level
=
pDisk
->
level
;
fGroup
.
did
=
pDisk
->
did
;
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbCreateFile
(
&
(
fGroup
.
files
[
type
]),
pRepo
,
fid
,
type
,
pDisk
)
<
0
)
goto
_err
;
}
...
...
@@ -136,7 +139,7 @@ _err:
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
tsdbDestroyFile
(
&
(
fGroup
.
files
[
type
]));
}
dnode
DecDiskFiles
(
tsDnodeTier
,
pDisk
,
true
);
td
DecDiskFiles
(
tsDnodeTier
,
pDisk
,
true
);
return
NULL
;
}
...
...
@@ -270,13 +273,13 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
return
(
SFileGroup
*
)
ptr
;
}
void
tsdbRemoveFilesBeyondRetention
(
STsdbRepo
*
pRepo
,
int
mfid
)
{
void
tsdbRemoveFilesBeyondRetention
(
STsdbRepo
*
pRepo
,
SFidGroup
*
pFidGroup
)
{
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pGroup
=
pFileH
->
pFGroup
;
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
while
(
pFileH
->
nFGroups
>
0
&&
pGroup
[
0
].
fileId
<
mf
id
)
{
while
(
pFileH
->
nFGroups
>
0
&&
pGroup
[
0
].
fileId
<
pFidGroup
->
minF
id
)
{
tsdbRemoveFileGroup
(
pRepo
,
pGroup
);
}
...
...
@@ -339,7 +342,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
SFileGroup
fileGroup
=
*
pFGroup
;
tsdbGetBaseDirFromFile
(
fileGroup
.
files
[
0
].
fname
,
baseDir
);
pDisk
=
dnode
GetDiskByName
(
tsDnodeTier
,
baseDir
);
pDisk
=
td
GetDiskByName
(
tsDnodeTier
,
baseDir
);
ASSERT
(
pDisk
!=
NULL
);
int
nFilesLeft
=
pFileH
->
nFGroups
-
(
int
)(
POINTER_DISTANCE
(
pFGroup
,
pFileH
->
pFGroup
)
/
sizeof
(
SFileGroup
)
+
1
);
...
...
@@ -357,7 +360,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
tsdbDestroyFile
(
&
fileGroup
.
files
[
type
]);
}
pDisk
->
dmeta
.
nfiles
--
;
tdDecDiskFiles
(
tsDnodeTier
,
pDisk
,
true
)
;
}
int
tsdbLoadFileHeader
(
SFile
*
pFile
,
uint32_t
*
version
)
{
...
...
@@ -415,8 +418,15 @@ _err:
*
size
=
0
;
}
int
tsdbGetCurrMinFid
(
int8_t
precision
,
int32_t
keep
,
int32_t
days
)
{
return
(
int
)(
TSDB_KEY_FILEID
(
tsdbGetCurrMinKey
(
precision
,
keep
),
days
,
precision
));
void
tsdbGetFidGroup
(
STsdbCfg
*
pCfg
,
SFidGroup
*
pFidGroup
)
{
TSKEY
now
=
taosGetTimestamp
(
pCfg
->
precision
);
pFidGroup
->
minFid
=
TSDB_KEY_FILEID
(
now
-
pCfg
->
keep
*
tsMsPerDay
[
pCfg
->
precision
],
pCfg
->
daysPerFile
,
pCfg
->
precision
);
pFidGroup
->
midFid
=
TSDB_KEY_FILEID
(
now
-
pCfg
->
keep2
*
tsMsPerDay
[
pCfg
->
precision
],
pCfg
->
daysPerFile
,
pCfg
->
precision
);
pFidGroup
->
maxFid
=
TSDB_KEY_FILEID
(
now
-
pCfg
->
keep1
*
tsMsPerDay
[
pCfg
->
precision
],
pCfg
->
daysPerFile
,
pCfg
->
precision
);
}
int
tsdbGetBaseDirFromFile
(
char
*
fname
,
char
*
baseDir
)
{
...
...
@@ -435,8 +445,54 @@ int tsdbGetBaseDirFromFile(char *fname, char *baseDir) {
return
0
;
}
int
tsdbApplyRetention
(
STsdbRepo
*
pRepo
)
{
// TODO
int
tsdbApplyRetention
(
STsdbRepo
*
pRepo
,
SFidGroup
*
pFidGroup
)
{
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pGroup
=
NULL
;
SFileGroup
nFileGroup
=
{
0
};
SFileGroup
oFileGroup
=
{
0
};
int
level
=
0
;
if
(
tsDnodeTier
->
nTiers
==
1
||
(
pFidGroup
->
minFid
==
pFidGroup
->
midFid
&&
pFidGroup
->
midFid
==
pFidGroup
->
maxFid
))
{
return
0
;
}
for
(
int
gidx
=
pFileH
->
nFGroups
-
1
;
gidx
>=
0
;
gidx
--
)
{
pGroup
=
pFileH
->
pFGroup
+
gidx
;
level
=
tsdbGetFidLevel
(
pGroup
->
fileId
,
pFidGroup
);
if
(
level
==
pGroup
->
level
)
continue
;
if
(
level
>
pGroup
->
level
&&
level
<
tsDnodeTier
->
nTiers
)
{
SDisk
*
pODisk
=
tdGetDisk
(
tsDnodeTier
,
pGroup
->
level
,
pGroup
->
did
);
SDisk
*
pDisk
=
tdAssignDisk
(
tsDnodeTier
,
level
);
tsdbCreateVnodeDataDir
(
pDisk
->
dir
,
REPO_ID
(
pRepo
));
oFileGroup
=
*
pGroup
;
nFileGroup
=
*
pGroup
;
nFileGroup
.
level
=
level
;
nFileGroup
.
did
=
pDisk
->
did
;
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
// TODO fileGroup.files[type].fname
}
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
}
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
*
pGroup
=
nFileGroup
;
pthread_rwlock_unlock
(
&
(
pFileH
->
fhlock
));
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
(
void
)
remove
(
oFileGroup
.
files
[
type
].
fname
);
}
tdLockTiers
(
tsDnodeTier
);
tdDecDiskFiles
(
tsDnodeTier
,
pODisk
,
false
);
tdIncDiskFiles
(
tsDnodeTier
,
pDisk
,
false
);
tdUnLockTiers
(
tsDnodeTier
);
}
}
return
0
;
}
...
...
@@ -466,10 +522,6 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) {
}
}
static
TSKEY
tsdbGetCurrMinKey
(
int8_t
precision
,
int32_t
keep
)
{
return
(
TSKEY
)(
taosGetTimestamp
(
precision
)
-
keep
*
tsMsPerDay
[
precision
]);
}
static
int
tsdbLoadFilesFromDisk
(
STsdbRepo
*
pRepo
,
SDisk
*
pDisk
)
{
char
tsdbDataDir
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
char
tsdbRootDir
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
...
...
@@ -479,6 +531,7 @@ static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) {
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
fgroup
=
{
0
};
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
SFidGroup
fidGroup
=
{
0
};
int
mfid
=
0
;
tdGetTsdbRootDir
(
pDisk
->
dir
,
REPO_ID
(
pRepo
),
tsdbRootDir
);
...
...
@@ -494,7 +547,8 @@ static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) {
goto
_err
;
}
mfid
=
tsdbGetCurrMinFid
(
pCfg
->
precision
,
pCfg
->
keep
,
pCfg
->
daysPerFile
);
tsdbGetFidGroup
(
pCfg
,
&
fidGroup
);
mfid
=
fidGroup
.
minFid
;
while
(
taosHashIterNext
(
pIter
))
{
int32_t
fid
=
*
(
int32_t
*
)
taosHashIterGet
(
pIter
);
...
...
@@ -677,4 +731,45 @@ _err:
if
(
dir
!=
NULL
)
closedir
(
dir
);
regfree
(
&
regex
);
return
NULL
;
}
static
int
tsdbGetFidLevel
(
int
fid
,
SFidGroup
*
pFidGroup
)
{
if
(
fid
>=
pFidGroup
->
maxFid
)
{
return
0
;
}
else
if
(
fid
>=
pFidGroup
->
midFid
&&
fid
<
pFidGroup
->
maxFid
)
{
return
1
;
}
else
{
return
2
;
}
}
static
int
tsdbCreateVnodeDataDir
(
char
*
baseDir
,
int
vid
)
{
char
dirName
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
char
tsdbRootDir
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
tdGetVnodeRootDir
(
baseDir
,
dirName
);
if
(
taosMkDir
(
dirName
,
0755
)
<
0
&&
errno
!=
EEXIST
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
tdGetVnodeDir
(
baseDir
,
vid
,
dirName
);
if
(
taosMkDir
(
dirName
,
0755
)
<
0
&&
errno
!=
EEXIST
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
tdGetTsdbRootDir
(
baseDir
,
vid
,
tsdbRootDir
);
if
(
taosMkDir
(
tsdbRootDir
,
0755
)
<
0
&&
errno
!=
EEXIST
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
tdGetTsdbDataDir
(
baseDir
,
vid
,
dirName
);
if
(
taosMkDir
(
dirName
,
0755
)
<
0
&&
errno
!=
EEXIST
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
return
0
;
}
\ No newline at end of file
src/tsdb/src/tsdbMemTable.c
浏览文件 @
8517faae
...
...
@@ -471,6 +471,7 @@ static void *tsdbCommitData(void *arg) {
SDataCols
*
pDataCols
=
NULL
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SCommitIter
*
iters
=
NULL
;
SFidGroup
fidGroup
=
{
0
};
SRWHelper
whelper
=
{
0
};
TSKEY
minKey
=
0
,
maxKey
=
0
;
ASSERT
(
pRepo
->
commit
==
1
);
...
...
@@ -479,9 +480,9 @@ static void *tsdbCommitData(void *arg) {
tsdbInfo
(
"vgId:%d start to commit! keyFirst %"
PRId64
" keyLast %"
PRId64
" numOfRows %"
PRId64
,
REPO_ID
(
pRepo
),
pMem
->
keyFirst
,
pMem
->
keyLast
,
pMem
->
numOfRows
);
int
mfid
=
tsdbGetCurrMinFid
(
pCfg
->
precision
,
pCfg
->
keep
,
pCfg
->
daysPerFile
);
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
mf
id
,
&
minKey
,
&
maxKey
);
tsdbRemoveFilesBeyondRetention
(
pRepo
,
mfid
);
tsdbGetFidGroup
(
pCfg
,
&
fidGroup
);
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fidGroup
.
minF
id
,
&
minKey
,
&
maxKey
);
tsdbRemoveFilesBeyondRetention
(
pRepo
,
&
fidGroup
);
// Create the iterator to read from cache
if
(
pMem
->
numOfRows
>
0
)
{
...
...
@@ -510,7 +511,7 @@ static void *tsdbCommitData(void *arg) {
// Loop to commit to each file
for
(
int
fid
=
sfid
;
fid
<=
efid
;
fid
++
)
{
if
(
fid
<
mf
id
)
continue
;
if
(
fid
<
fidGroup
.
minF
id
)
continue
;
if
(
tsdbCommitToFile
(
pRepo
,
fid
,
iters
,
&
whelper
,
pDataCols
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit to file %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
...
...
@@ -519,7 +520,7 @@ static void *tsdbCommitData(void *arg) {
}
}
tsdbApplyRetention
(
pRepo
);
tsdbApplyRetention
(
pRepo
,
&
fidGroup
);
// Commit to update meta file
if
(
tsdbCommitMeta
(
pRepo
)
<
0
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录