Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6536ad72
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
6536ad72
编写于
9月 27, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
chore: update file naming for tsdb retention
上级
5c61e5da
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
221 addition
and
351 deletion
+221
-351
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+1
-1
source/dnode/vnode/src/tsdb/tsdbRetention.c
source/dnode/vnode/src/tsdb/tsdbRetention.c
+220
-67
source/dnode/vnode/src/tsdb/tsdbRetention2.c
source/dnode/vnode/src/tsdb/tsdbRetention2.c
+0
-283
未找到文件。
source/dnode/vnode/CMakeLists.txt
浏览文件 @
6536ad72
...
@@ -49,7 +49,7 @@ target_sources(
...
@@ -49,7 +49,7 @@ target_sources(
"src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbCacheRead.c"
"src/tsdb/tsdbCacheRead.c"
"src/tsdb/tsdbRetention
2
.c"
"src/tsdb/tsdbRetention.c"
"src/tsdb/tsdbDiskData.c"
"src/tsdb/tsdbDiskData.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbMergeTree.c"
"src/tsdb/tsdbMergeTree.c"
...
...
source/dnode/vnode/src/tsdb/tsdbRetention.c
浏览文件 @
6536ad72
...
@@ -15,16 +15,16 @@
...
@@ -15,16 +15,16 @@
#include "tsdb.h"
#include "tsdb.h"
static
bool
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
enum
{
RETENTION_NO
=
0
,
RETENTION_EXPIRED
=
1
,
RETENTION_MIGRATE
=
2
};
if
(
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
)
==
0
)
{
return
false
;
}
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
0
);
#define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB
if
(
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
)
<
0
)
{
#define MIGRATE_MIN_COST (5) // second
return
true
;
}
static
bool
tsdbShouldDoMigrate
(
STsdb
*
pTsdb
);
static
int32_t
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
);
static
int32_t
tsdbProcessRetention
(
STsdb
*
pTsdb
,
int64_t
now
,
int64_t
maxSpeed
,
int32_t
retention
,
int8_t
type
);
static
bool
tsdbShouldDoMigrate
(
STsdb
*
pTsdb
)
{
if
(
tfsGetLevel
(
pTsdb
->
pVnode
->
pTfs
)
<
2
)
{
if
(
tfsGetLevel
(
pTsdb
->
pVnode
->
pTfs
)
<
2
)
{
return
false
;
return
false
;
}
}
...
@@ -33,98 +33,251 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
...
@@ -33,98 +33,251 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
if
(
keepCfg
->
keep0
==
keepCfg
->
keep1
&&
keepCfg
->
keep1
==
keepCfg
->
keep2
)
{
if
(
keepCfg
->
keep0
==
keepCfg
->
keep1
&&
keepCfg
->
keep1
==
keepCfg
->
keep2
)
{
return
false
;
return
false
;
}
}
return
true
;
}
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
);
iSet
++
)
{
static
int32_t
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
iSet
);
int32_t
retention
=
RETENTION_NO
;
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
keepCfg
,
now
);
if
(
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
)
==
0
)
{
SDiskID
did
;
return
retention
;
}
if
(
expLevel
==
pSet
->
diskId
.
level
)
continue
;
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
0
);
if
(
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
)
<
0
)
{
retention
|=
RETENTION_EXPIRED
;
}
if
(
expLevel
<
0
)
{
if
(
!
tsdbShouldDoMigrate
(
pTsdb
))
{
return
true
;
return
retention
;
}
else
{
}
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
return
false
;
}
if
(
did
.
level
==
pSet
->
diskId
.
level
)
continue
;
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
);
++
iSet
)
{
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
if
(
expLevel
==
pSet
->
diskId
.
level
)
continue
;
return
true
;
if
(
expLevel
>
0
)
{
retention
|=
RETENTION_MIGRATE
;
break
;
}
}
}
}
return
false
;
return
retention
;
}
}
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
/**
* @brief process retention
*
* @param pTsdb
* @param now
* @param maxSpeed
* @param retention
* @param type 0 RETENTION_EXPIRED, 1 RETENTION_MIGRATE
* @return int32_t
*/
static
int32_t
tsdbProcessRetention
(
STsdb
*
pTsdb
,
int64_t
now
,
int64_t
maxSpeed
,
int32_t
retention
,
int8_t
type
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
nBatch
=
0
;
int32_t
nLoops
=
0
;
int32_t
maxFid
=
0
;
int64_t
fSize
=
0
;
int64_t
speed
=
maxSpeed
>
0
?
maxSpeed
:
MIGRATE_MAX_SPEED
;
STsdbFS
fs
=
{
0
};
STsdbFS
fsLatest
=
{
0
};
if
(
!
tsdbShouldDoRetention
(
pTsdb
,
now
))
{
if
(
!
(
retention
&
type
))
{
return
code
;
goto
_exit
;
}
}
// do retention
_retention_loop:
STsdbFS
fs
;
// reset
maxFid
=
INT32_MIN
;
fSize
=
0
;
tsdbFSDestroy
(
&
fs
);
tsdbFSDestroy
(
&
fsLatest
);
if
(
atomic_load_8
(
&
pTsdb
->
trimHdl
.
commitInWait
)
==
1
)
{
atomic_store_32
(
&
pTsdb
->
trimHdl
.
maxRetentFid
,
INT32_MIN
);
taosMsleep
(
50
);
}
code
=
tsdbFSCopy
(
pTsdb
,
&
fs
);
code
=
tsdbFSCopy
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_exit
;
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
fs
.
aDFileSet
);
iSet
++
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
expLevel
<
0
)
{
taosMemoryFree
(
pSet
->
pHeadF
);
taosMemoryFree
(
pSet
->
pDataF
);
taosMemoryFree
(
pSet
->
aSttF
[
0
]);
taosMemoryFree
(
pSet
->
pSmaF
);
taosArrayRemove
(
fs
.
aDFileSet
,
iSet
);
iSet
--
;
}
else
{
if
(
expLevel
==
0
)
continue
;
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
code
=
terrno
;
goto
_exit
;
}
if
(
did
.
level
==
pSet
->
diskId
.
level
)
continue
;
int32_t
fsSize
=
taosArrayGetSize
(
fs
.
aDFileSet
);
if
(
type
==
RETENTION_MIGRATE
)
{
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
++
iSet
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
// copy file to new disk (todo)
if
(
pSet
->
diskId
.
level
==
expLevel
)
continue
;
SDFileSet
fSet
=
*
pSet
;
fSet
.
diskId
=
did
;
code
=
tsdbDFileSetCopy
(
pTsdb
,
pSet
,
&
fSet
);
if
(
expLevel
>
0
)
{
if
(
code
)
goto
_err
;
ASSERT
(
pSet
->
fid
>
maxFid
);
maxFid
=
pSet
->
fid
;
fSize
+=
(
pSet
->
pDataF
->
size
+
pSet
->
pHeadF
->
size
+
pSet
->
pSmaF
->
size
);
if
(
fSize
/
speed
>
MIGRATE_MIN_COST
)
{
break
;
}
for
(
int32_t
iStt
=
0
;
iStt
<
pSet
->
nSttF
;
++
iStt
)
{
fSize
+=
pSet
->
aSttF
[
iStt
]
->
size
;
}
if
(
fSize
/
speed
>
MIGRATE_MIN_COST
)
{
tsdbInfo
(
"vgId:%d migrate loop[%d] with maxFid:%d"
,
TD_VID
(
pTsdb
->
pVnode
),
nBatch
,
maxFid
);
break
;
}
}
}
}
else
if
(
type
==
RETENTION_EXPIRED
)
{
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
++
iSet
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
code
=
tsdbFSUpsertFSet
(
&
fs
,
&
fSet
);
if
(
expLevel
<
0
)
{
if
(
code
)
goto
_err
;
SET_DFSET_EXPIRED
(
pSet
);
if
(
pSet
->
fid
>
maxFid
)
maxFid
=
pSet
->
fid
;
}
else
{
break
;
}
}
}
}
}
// do change fs
if
(
maxFid
==
INT32_MIN
)
goto
_exit
;
code
=
tsdbFSCommit1
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_err
;
_commit_conflict_check:
while
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
)
<=
maxFid
)
{
if
(
++
nLoops
>
1000
)
{
nLoops
=
0
;
sched_yield
();
}
}
if
(
atomic_val_compare_exchange_8
(
&
pTsdb
->
trimHdl
.
state
,
0
,
1
)
==
0
)
{
if
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
)
<=
maxFid
)
{
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
goto
_commit_conflict_check
;
}
atomic_store_32
(
&
pTsdb
->
trimHdl
.
maxRetentFid
,
maxFid
);
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
}
else
{
goto
_commit_conflict_check
;
}
// migrate
if
(
type
==
RETENTION_MIGRATE
)
{
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
++
iSet
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
pSet
->
fid
>
maxFid
)
break
;
tsdbInfo
(
"vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d"
,
TD_VID
(
pTsdb
->
pVnode
),
nBatch
,
maxFid
,
pSet
->
fid
,
pSet
->
diskId
.
id
,
pSet
->
diskId
.
level
,
expLevel
);
if
(
expLevel
<
0
)
{
SET_DFSET_EXPIRED
(
pSet
);
}
else
{
if
(
expLevel
==
pSet
->
diskId
.
level
)
continue
;
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
code
=
terrno
;
goto
_exit
;
}
if
(
did
.
level
==
pSet
->
diskId
.
level
)
continue
;
// copy file to new disk
SDFileSet
fSet
=
*
pSet
;
fSet
.
diskId
=
did
;
code
=
tsdbDFileSetCopy
(
pTsdb
,
pSet
,
&
fSet
,
maxSpeed
);
if
(
code
)
goto
_exit
;
code
=
tsdbFSUpsertFSet
(
&
fs
,
&
fSet
);
if
(
code
)
goto
_exit
;
}
}
}
_merge_fs:
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
code
=
tsdbFSCommit2
(
pTsdb
,
&
fs
);
if
((
code
=
tsdbFSCopy
(
pTsdb
,
&
fsLatest
)))
{
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
goto
_exit
;
}
// 1) merge tsdbFSNew and pTsdb->fs
if
((
code
=
tsdbFSUpdDel
(
pTsdb
,
&
fsLatest
,
&
fs
,
maxFid
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 2) save CURRENT
if
((
code
=
tsdbFSCommit1
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 3) apply the tsdbFS to pTsdb->fs
if
((
code
=
tsdbFSCommit2
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
}
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbFSDestroy
(
&
fs
);
if
(
type
==
RETENTION_MIGRATE
)
{
++
nBatch
;
goto
_retention_loop
;
}
_exit:
_exit:
tsdbFSDestroy
(
&
fs
);
tsdbFSDestroy
(
&
fsLatest
);
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, tsdb do retention %"
PRIi8
" failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
type
,
tstrerror
(
code
));
ASSERT
(
0
);
}
return
code
;
return
code
;
}
/**
* @brief Data migration between multi-tier storage, including remove expired data.
* 1) firstly, remove expired DFileSet;
* 2) partition the tsdbFS by the expLevel and the estimated cost(e.g. 5s) to copy, and migrate
* DFileSet groups between multi-tier storage;
* 3) update the tsdbFS and CURRENT in the same transaction;
* 4) finish the migration
* @param pTsdb
* @param now
* @param maxSpeed
* @return int32_t
*/
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
,
int64_t
maxSpeed
)
{
int32_t
code
=
0
;
int32_t
retention
=
RETENTION_NO
;
retention
=
tsdbShouldDoRetention
(
pTsdb
,
now
);
if
(
retention
==
RETENTION_NO
)
{
goto
_exit
;
}
_err:
// step 1: process expire
tsdbError
(
"vgId:%d, tsdb do retention failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
code
=
tsdbProcessRetention
(
pTsdb
,
now
,
maxSpeed
,
retention
,
RETENTION_EXPIRED
);
ASSERT
(
0
);
if
(
code
<
0
)
goto
_exit
;
// tsdbFSRollback(pTsdb->pFS);
// step 2: process multi-tier migration
code
=
tsdbProcessRetention
(
pTsdb
,
now
,
maxSpeed
,
retention
,
RETENTION_MIGRATE
);
if
(
code
<
0
)
goto
_exit
;
_exit:
pTsdb
->
trimHdl
.
maxRetentFid
=
INT32_MIN
;
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, tsdb do retention %d failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
retention
,
tstrerror
(
code
));
ASSERT
(
0
);
// tsdbFSRollback(pTsdb->pFS);
}
else
{
tsdbInfo
(
"vgId:%d, tsdb do retention %d succeed"
,
TD_VID
(
pTsdb
->
pVnode
),
retention
);
}
return
code
;
return
code
;
}
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbRetention2.c
已删除
100644 → 0
浏览文件 @
5c61e5da
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
enum
{
RETENTION_NO
=
0
,
RETENTION_EXPIRED
=
1
,
RETENTION_MIGRATE
=
2
};
#define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB
#define MIGRATE_MIN_COST (5) // second
static
bool
tsdbShouldDoMigrate
(
STsdb
*
pTsdb
);
static
int32_t
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
);
static
int32_t
tsdbProcessRetention
(
STsdb
*
pTsdb
,
int64_t
now
,
int64_t
maxSpeed
,
int32_t
retention
,
int8_t
type
);
static
bool
tsdbShouldDoMigrate
(
STsdb
*
pTsdb
)
{
if
(
tfsGetLevel
(
pTsdb
->
pVnode
->
pTfs
)
<
2
)
{
return
false
;
}
STsdbKeepCfg
*
keepCfg
=
&
pTsdb
->
keepCfg
;
if
(
keepCfg
->
keep0
==
keepCfg
->
keep1
&&
keepCfg
->
keep1
==
keepCfg
->
keep2
)
{
return
false
;
}
return
true
;
}
static
int32_t
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
int32_t
retention
=
RETENTION_NO
;
if
(
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
)
==
0
)
{
return
retention
;
}
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
0
);
if
(
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
)
<
0
)
{
retention
|=
RETENTION_EXPIRED
;
}
if
(
!
tsdbShouldDoMigrate
(
pTsdb
))
{
return
retention
;
}
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
);
++
iSet
)
{
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
if
(
expLevel
==
pSet
->
diskId
.
level
)
continue
;
if
(
expLevel
>
0
)
{
retention
|=
RETENTION_MIGRATE
;
break
;
}
}
return
retention
;
}
/**
* @brief process retention
*
* @param pTsdb
* @param now
* @param maxSpeed
* @param retention
* @param type 0 RETENTION_EXPIRED, 1 RETENTION_MIGRATE
* @return int32_t
*/
static
int32_t
tsdbProcessRetention
(
STsdb
*
pTsdb
,
int64_t
now
,
int64_t
maxSpeed
,
int32_t
retention
,
int8_t
type
)
{
int32_t
code
=
0
;
int32_t
nBatch
=
0
;
int32_t
nLoops
=
0
;
int32_t
maxFid
=
0
;
int64_t
fSize
=
0
;
int64_t
speed
=
maxSpeed
>
0
?
maxSpeed
:
MIGRATE_MAX_SPEED
;
STsdbFS
fs
=
{
0
};
STsdbFS
fsLatest
=
{
0
};
if
(
!
(
retention
&
type
))
{
goto
_exit
;
}
_retention_loop:
// reset
maxFid
=
INT32_MIN
;
fSize
=
0
;
tsdbFSDestroy
(
&
fs
);
tsdbFSDestroy
(
&
fsLatest
);
if
(
atomic_load_8
(
&
pTsdb
->
trimHdl
.
commitInWait
)
==
1
)
{
atomic_store_32
(
&
pTsdb
->
trimHdl
.
maxRetentFid
,
INT32_MIN
);
taosMsleep
(
50
);
}
code
=
tsdbFSCopy
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_exit
;
int32_t
fsSize
=
taosArrayGetSize
(
fs
.
aDFileSet
);
if
(
type
==
RETENTION_MIGRATE
)
{
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
++
iSet
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
pSet
->
diskId
.
level
==
expLevel
)
continue
;
if
(
expLevel
>
0
)
{
ASSERT
(
pSet
->
fid
>
maxFid
);
maxFid
=
pSet
->
fid
;
fSize
+=
(
pSet
->
pDataF
->
size
+
pSet
->
pHeadF
->
size
+
pSet
->
pSmaF
->
size
);
if
(
fSize
/
speed
>
MIGRATE_MIN_COST
)
{
break
;
}
for
(
int32_t
iStt
=
0
;
iStt
<
pSet
->
nSttF
;
++
iStt
)
{
fSize
+=
pSet
->
aSttF
[
iStt
]
->
size
;
}
if
(
fSize
/
speed
>
MIGRATE_MIN_COST
)
{
tsdbInfo
(
"vgId:%d migrate loop[%d] with maxFid:%d"
,
TD_VID
(
pTsdb
->
pVnode
),
nBatch
,
maxFid
);
break
;
}
}
}
}
else
if
(
type
==
RETENTION_EXPIRED
)
{
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
++
iSet
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
expLevel
<
0
)
{
SET_DFSET_EXPIRED
(
pSet
);
if
(
pSet
->
fid
>
maxFid
)
maxFid
=
pSet
->
fid
;
}
else
{
break
;
}
}
}
if
(
maxFid
==
INT32_MIN
)
goto
_exit
;
_commit_conflict_check:
while
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
)
<=
maxFid
)
{
if
(
++
nLoops
>
1000
)
{
nLoops
=
0
;
sched_yield
();
}
}
if
(
atomic_val_compare_exchange_8
(
&
pTsdb
->
trimHdl
.
state
,
0
,
1
)
==
0
)
{
if
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
)
<=
maxFid
)
{
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
goto
_commit_conflict_check
;
}
atomic_store_32
(
&
pTsdb
->
trimHdl
.
maxRetentFid
,
maxFid
);
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
}
else
{
goto
_commit_conflict_check
;
}
// migrate
if
(
type
==
RETENTION_MIGRATE
)
{
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
++
iSet
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
pSet
->
fid
>
maxFid
)
break
;
tsdbInfo
(
"vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d"
,
TD_VID
(
pTsdb
->
pVnode
),
nBatch
,
maxFid
,
pSet
->
fid
,
pSet
->
diskId
.
id
,
pSet
->
diskId
.
level
,
expLevel
);
if
(
expLevel
<
0
)
{
SET_DFSET_EXPIRED
(
pSet
);
}
else
{
if
(
expLevel
==
pSet
->
diskId
.
level
)
continue
;
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
code
=
terrno
;
goto
_exit
;
}
if
(
did
.
level
==
pSet
->
diskId
.
level
)
continue
;
// copy file to new disk
SDFileSet
fSet
=
*
pSet
;
fSet
.
diskId
=
did
;
code
=
tsdbDFileSetCopy
(
pTsdb
,
pSet
,
&
fSet
,
maxSpeed
);
if
(
code
)
goto
_exit
;
code
=
tsdbFSUpsertFSet
(
&
fs
,
&
fSet
);
if
(
code
)
goto
_exit
;
}
}
}
_merge_fs:
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
if
((
code
=
tsdbFSCopy
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 1) merge tsdbFSNew and pTsdb->fs
if
((
code
=
tsdbFSUpdDel
(
pTsdb
,
&
fsLatest
,
&
fs
,
maxFid
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 2) save CURRENT
if
((
code
=
tsdbFSCommit1
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 3) apply the tsdbFS to pTsdb->fs
if
((
code
=
tsdbFSCommit2
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
type
==
RETENTION_MIGRATE
)
{
++
nBatch
;
goto
_retention_loop
;
}
_exit:
tsdbFSDestroy
(
&
fs
);
tsdbFSDestroy
(
&
fsLatest
);
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, tsdb do retention %"
PRIi8
" failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
type
,
tstrerror
(
code
));
ASSERT
(
0
);
}
return
code
;
}
/**
* @brief Data migration between multi-tier storage, including remove expired data.
* 1) firstly, remove expired DFileSet;
* 2) partition the tsdbFS by the expLevel and the estimated cost(e.g. 5s) to copy, and migrate
* DFileSet groups between multi-tier storage;
* 3) update the tsdbFS and CURRENT in the same transaction;
* 4) finish the migration
* @param pTsdb
* @param now
* @param maxSpeed
* @return int32_t
*/
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
,
int64_t
maxSpeed
)
{
int32_t
code
=
0
;
int32_t
retention
=
RETENTION_NO
;
retention
=
tsdbShouldDoRetention
(
pTsdb
,
now
);
if
(
retention
==
RETENTION_NO
)
{
goto
_exit
;
}
// step 1: process expire
code
=
tsdbProcessRetention
(
pTsdb
,
now
,
maxSpeed
,
retention
,
RETENTION_EXPIRED
);
if
(
code
<
0
)
goto
_exit
;
// step 2: process multi-tier migration
code
=
tsdbProcessRetention
(
pTsdb
,
now
,
maxSpeed
,
retention
,
RETENTION_MIGRATE
);
if
(
code
<
0
)
goto
_exit
;
_exit:
pTsdb
->
trimHdl
.
maxRetentFid
=
INT32_MIN
;
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, tsdb do retention %d failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
retention
,
tstrerror
(
code
));
ASSERT
(
0
);
// tsdbFSRollback(pTsdb->pFS);
}
else
{
tsdbInfo
(
"vgId:%d, tsdb do retention %d succeed"
,
TD_VID
(
pTsdb
->
pVnode
),
retention
);
}
return
code
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录