Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b8ea469a
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b8ea469a
编写于
9月 23, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(tsdb): data migrate should not block data r/w and commit
上级
f0a37948
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
760 addition
and
74 deletion
+760
-74
include/common/tmsg.h
include/common/tmsg.h
+2
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+2
-2
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+12
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+5
-5
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+3
-2
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+38
-2
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+442
-1
source/dnode/vnode/src/tsdb/tsdbOpen.c
source/dnode/vnode/src/tsdb/tsdbOpen.c
+3
-0
source/dnode/vnode/src/tsdb/tsdbRetention2.c
source/dnode/vnode/src/tsdb/tsdbRetention2.c
+238
-53
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+11
-6
未找到文件。
include/common/tmsg.h
浏览文件 @
b8ea469a
...
...
@@ -873,7 +873,8 @@ int32_t tSerializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq);
int32_t
tDeserializeSTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
STrimDbReq
*
pReq
);
typedef
struct
{
int32_t
timestamp
;
int64_t
timestamp
;
// unit: millisecond
int32_t
maxSpeed
;
// 0 no limit, unit: bit/s
}
SVTrimDbReq
;
int32_t
tSerializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
);
...
...
source/common/src/tmsg.c
浏览文件 @
b8ea469a
...
...
@@ -2706,7 +2706,8 @@ int32_t tSerializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) {
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
timestamp
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
timestamp
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
maxSpeed
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -2719,7 +2720,8 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) {
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
timestamp
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
timestamp
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
maxSpeed
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
b8ea469a
...
...
@@ -1389,7 +1389,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
SVTrimDbReq
trimReq
=
{.
timestamp
=
taosGetTimestamp
Sec
()};
SVTrimDbReq
trimReq
=
{.
timestamp
=
taosGetTimestamp
Ms
(),
.
maxSpeed
=
1048576
<<
5
};
// TODO: use specified maxSpeed
int32_t
reqLen
=
tSerializeSVTrimDbReq
(
NULL
,
0
,
&
trimReq
);
int32_t
contLen
=
reqLen
+
sizeof
(
SMsgHead
);
...
...
@@ -1413,7 +1413,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
if
(
code
!=
0
)
{
mError
(
"vgId:%d, failed to send vnode-trim request to vnode since 0x%x"
,
pVgroup
->
vgId
,
code
);
}
else
{
mDebug
(
"vgId:%d, send vnode-trim request to vnode, time:%
d"
,
pVgroup
->
vgId
,
trimReq
.
timestamp
);
mDebug
(
"vgId:%d, send vnode-trim request to vnode, time:%
"
PRIi64
,
pVgroup
->
vgId
,
trimReq
.
timestamp
);
}
sdbRelease
(
pSdb
,
pVgroup
);
}
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
b8ea469a
...
...
@@ -58,6 +58,7 @@ typedef struct SDelFWriter SDelFWriter;
typedef
struct
SDelFReader
SDelFReader
;
typedef
struct
SRowIter
SRowIter
;
typedef
struct
STsdbFS
STsdbFS
;
typedef
struct
STsdbTrimHdl
STsdbTrimHdl
;
typedef
struct
SRowMerger
SRowMerger
;
typedef
struct
STsdbReadSnap
STsdbReadSnap
;
typedef
struct
SBlockInfo
SBlockInfo
;
...
...
@@ -243,6 +244,7 @@ int32_t tsdbFSClose(STsdb *pTsdb);
int32_t
tsdbFSCopy
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
void
tsdbFSDestroy
(
STsdbFS
*
pFS
);
int32_t
tDFileSetCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
tsdbFSUpdDel
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
,
STsdbFS
*
pFSNew
,
int32_t
maxFid
);
int32_t
tsdbFSCommit1
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSCommit2
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSRef
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
...
...
@@ -318,6 +320,12 @@ struct STsdbFS {
SArray
*
aDFileSet
;
// SArray<SDFileSet>
};
struct
STsdbTrimHdl
{
volatile
int8_t
state
;
// 0 idle 1 in use
volatile
int32_t
maxRetentFid
;
volatile
int32_t
minCommitFid
;
};
struct
STsdb
{
char
*
path
;
SVnode
*
pVnode
;
...
...
@@ -326,6 +334,7 @@ struct STsdb {
SMemTable
*
mem
;
SMemTable
*
imem
;
STsdbFS
fs
;
STsdbTrimHdl
trimHdl
;
SLRUCache
*
lruCache
;
TdThreadMutex
lruMutex
;
};
...
...
@@ -559,6 +568,9 @@ struct SDFileSet {
SSttFile
*
aSttF
[
TSDB_MAX_STT_TRIGGER
];
};
#define SET_DFSET_EXPIRED(d) ((d)->diskId.id = -1)
#define IS_DFSET_EXPIRED(d) ((d)->diskId.id == -1)
struct
SRowIter
{
TSDBROW
*
pRow
;
STSchema
*
pTSchema
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
b8ea469a
...
...
@@ -58,7 +58,7 @@ typedef struct STQ STQ;
typedef
struct
SVState
SVState
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SQWorker
SQHandle
;
typedef
struct
S
TrimDbHandle
STrimDbHandle
;
typedef
struct
S
VTrimDbHdl
SVTrimDbHdl
;
typedef
struct
STsdbKeepCfg
STsdbKeepCfg
;
typedef
struct
SMetaSnapReader
SMetaSnapReader
;
typedef
struct
SMetaSnapWriter
SMetaSnapWriter
;
...
...
@@ -145,7 +145,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC
int
tsdbClose
(
STsdb
**
pTsdb
);
int32_t
tsdbBegin
(
STsdb
*
pTsdb
);
int32_t
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
);
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
,
int32_t
maxSpeed
);
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int32_t
tsdbInsertTableData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
...
...
@@ -200,7 +200,7 @@ int32_t smaSyncPostCommit(SSma* pSma);
int32_t
smaAsyncPreCommit
(
SSma
*
pSma
);
int32_t
smaAsyncCommit
(
SSma
*
pSma
);
int32_t
smaAsyncPostCommit
(
SSma
*
pSma
);
int32_t
smaDoRetention
(
SSma
*
pSma
,
int64_t
now
);
int32_t
smaDoRetention
(
SSma
*
pSma
,
int64_t
now
,
int32_t
maxSpeed
);
int32_t
tdProcessTSmaCreate
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
msg
);
int32_t
tdProcessTSmaInsert
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
...
...
@@ -302,7 +302,7 @@ struct STsdbKeepCfg {
int32_t
keep1
;
int32_t
keep2
;
};
struct
S
TrimDbHandle
{
struct
S
VTrimDbHdl
{
volatile
int8_t
state
;
// 0 not in trim, 1 in trim
};
...
...
@@ -329,7 +329,7 @@ struct SVnode {
bool
restored
;
tsem_t
syncSem
;
SQHandle
*
pQuery
;
S
TrimDbHandle
trimDbH
;
S
VTrimDbHdl
trimDbH
;
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
b8ea469a
...
...
@@ -661,9 +661,10 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
*
* @param pSma
* @param now
* @param maxSpeed
* @return int32_t
*/
int32_t
smaDoRetention
(
SSma
*
pSma
,
int64_t
now
)
{
int32_t
smaDoRetention
(
SSma
*
pSma
,
int64_t
now
,
int32_t
maxSpeed
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
!
VND_IS_RSMA
(
pSma
->
pVnode
))
{
return
code
;
...
...
@@ -671,7 +672,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) {
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
if
(
pSma
->
pRSmaTsdb
[
i
])
{
code
=
tsdbDoRetention
(
pSma
->
pRSmaTsdb
[
i
],
now
);
code
=
tsdbDoRetention
(
pSma
->
pRSmaTsdb
[
i
],
now
,
maxSpeed
);
if
(
code
)
goto
_end
;
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
b8ea469a
...
...
@@ -756,6 +756,30 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
if
(
pTsdb
->
imem
->
nRow
>
0
)
{
int32_t
minCommitFid
=
tsdbKeyFid
(
pTsdb
->
imem
->
minKey
,
pCommitter
->
minutes
,
pCommitter
->
precision
);
int32_t
nLoops
=
0
;
_wait_retention_end:
while
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
maxRetentFid
)
>=
minCommitFid
)
{
if
(
++
nLoops
>
1000
)
{
nLoops
=
0
;
sched_yield
();
}
}
if
(
atomic_val_compare_exchange_8
(
&
pTsdb
->
trimHdl
.
state
,
0
,
1
)
==
0
)
{
if
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
maxRetentFid
)
>=
minCommitFid
)
{
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
goto
_wait_retention_end
;
}
atomic_store_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
,
minCommitFid
);
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
}
else
{
goto
_wait_retention_end
;
}
}
code
=
tsdbFSCopy
(
pTsdb
,
&
pCommitter
->
fs
);
if
(
code
)
goto
_err
;
...
...
@@ -962,14 +986,23 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
bool
inTrim
=
atomic_load_8
(
&
pTsdb
->
pVnode
->
trimDbH
.
state
);
ASSERT
(
eno
==
0
);
if
(
inTrim
)
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
int64_t
startTime
=
taosGetTimestampMs
();
code
=
tsdbFSCommit1
(
pTsdb
,
&
pCommitter
->
fs
);
if
(
code
)
goto
_err
;
if
(
code
)
{
if
(
inTrim
)
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
}
int64_t
endTime
=
taosGetTimestampMs
();
tsdbInfo
(
"vgId:%d, tsdb end commit - commit1 fsSize:%d cost: %"
PRIi64
" ms"
,
TD_VID
(
pTsdb
->
pVnode
),
(
int32_t
)
taosArrayGetSize
(
pCommitter
->
fs
.
aDFileSet
),
endTime
-
startTime
);
// lock
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
if
(
!
inTrim
)
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
// commit or rollback
code
=
tsdbFSCommit2
(
pTsdb
,
&
pCommitter
->
fs
);
...
...
@@ -977,6 +1010,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
}
tsdbInfo
(
"vgId:%d, tsdb end commit - commit2 cost: %"
PRIi64
" ms"
,
TD_VID
(
pTsdb
->
pVnode
),
taosGetTimestampMs
()
-
endTime
);
pTsdb
->
imem
=
NULL
;
...
...
@@ -986,6 +1020,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
tsdbUnrefMemTable
(
pMemTable
);
tsdbFSDestroy
(
&
pCommitter
->
fs
);
taosArrayDestroy
(
pCommitter
->
aTbDataP
);
atomic_store_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
,
INT32_MAX
);
// if (pCommitter->toMerge) {
// code = tsdbMerge(pTsdb);
...
...
@@ -996,6 +1031,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
return
code
;
_err:
atomic_store_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
,
INT32_MAX
);
tsdbError
(
"vgId:%d, tsdb end commit failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
b8ea469a
...
...
@@ -711,7 +711,7 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
_exit:
return
code
;
}
#if 0
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) {
int32_t code = 0;
char tfname[TSDB_FILENAME_LEN];
...
...
@@ -1045,6 +1045,447 @@ _err:
return code;
}
#endif
#if 1
/**
* @brief Update or delete DFileSet in pFS according to DFileSet (fid <= maxFid) in pFSNew.
*
* @param pTsdb
* @param pFS
* @param pFSNew
* @param maxFid
* @return int32_t
*/
int32_t
tsdbFSUpdDel
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
,
STsdbFS
*
pFSNew
,
int32_t
maxFid
)
{
int32_t
code
=
0
;
int32_t
nRef
=
0
;
char
fname
[
TSDB_FILENAME_LEN
];
int32_t
iOld
=
0
;
int32_t
iNew
=
0
;
while
(
true
)
{
int32_t
nOld
=
taosArrayGetSize
(
pFS
->
aDFileSet
);
int32_t
nNew
=
taosArrayGetSize
(
pFSNew
->
aDFileSet
);
SDFileSet
fSet
;
int8_t
sameDisk
;
if
(
iOld
>=
nOld
&&
iNew
>=
nNew
)
break
;
SDFileSet
*
pSetOld
=
(
iOld
<
nOld
)
?
taosArrayGet
(
pFS
->
aDFileSet
,
iOld
)
:
NULL
;
SDFileSet
*
pSetNew
=
(
iNew
<
nNew
)
?
taosArrayGet
(
pFSNew
->
aDFileSet
,
iNew
)
:
NULL
;
if
(
pSetNew
&&
(
pSetNew
->
fid
>
maxFid
))
break
;
if
(
pSetOld
&&
pSetNew
)
{
if
(
pSetOld
->
fid
==
pSetNew
->
fid
)
{
if
(
IS_DFSET_EXPIRED
(
pSetNew
))
goto
_remove_old
;
goto
_merge_migrate
;
}
else
if
(
pSetOld
->
fid
<
pSetNew
->
fid
)
{
++
iOld
;
}
else
{
++
iNew
;
}
continue
;
}
else
{
break
;
}
_merge_migrate:
sameDisk
=
((
pSetOld
->
diskId
.
level
==
pSetNew
->
diskId
.
level
)
&&
(
pSetOld
->
diskId
.
id
==
pSetNew
->
diskId
.
id
));
ASSERT
(
pSetOld
->
pHeadF
->
commitID
==
pSetNew
->
pHeadF
->
commitID
);
ASSERT
(
pSetOld
->
pHeadF
->
size
==
pSetNew
->
pHeadF
->
size
);
ASSERT
(
pSetOld
->
pHeadF
->
offset
==
pSetNew
->
pHeadF
->
offset
);
ASSERT
(
!
sameDisk
);
// head
*
pSetOld
->
pHeadF
=
*
pSetNew
->
pHeadF
;
pSetOld
->
pHeadF
->
nRef
=
1
;
// data
ASSERT
(
pSetOld
->
pDataF
->
size
==
pSetNew
->
pDataF
->
size
);
*
pSetOld
->
pDataF
=
*
pSetNew
->
pDataF
;
pSetOld
->
pDataF
->
nRef
=
1
;
// sma
ASSERT
(
pSetOld
->
pSmaF
->
size
==
pSetNew
->
pSmaF
->
size
);
*
pSetOld
->
pSmaF
=
*
pSetNew
->
pSmaF
;
pSetOld
->
pSmaF
->
nRef
=
1
;
// stt
ASSERT
(
pSetOld
->
nSttF
==
pSetNew
->
nSttF
);
for
(
int32_t
iStt
=
0
;
iStt
<
pSetOld
->
nSttF
;
++
iStt
)
{
ASSERT
(
pSetOld
->
aSttF
[
iStt
]
->
size
==
pSetNew
->
aSttF
[
iStt
]
->
size
);
ASSERT
(
pSetOld
->
aSttF
[
iStt
]
->
offset
==
pSetNew
->
aSttF
[
iStt
]
->
offset
);
*
pSetOld
->
aSttF
[
iStt
]
=
*
pSetNew
->
aSttF
[
iStt
];
pSetOld
->
aSttF
[
iStt
]
->
nRef
=
1
;
}
// set diskId
pSetOld
->
diskId
=
pSetNew
->
diskId
;
iOld
++
;
iNew
++
;
continue
;
_remove_old:
taosMemoryFree
(
pSetOld
->
pHeadF
);
taosMemoryFree
(
pSetOld
->
pDataF
);
for
(
int32_t
iStt
=
0
;
iStt
<
pSetOld
->
nSttF
;
++
iStt
)
{
taosMemoryFree
(
pSetOld
->
aSttF
[
iStt
]);
}
taosMemoryFree
(
pSetOld
->
pSmaF
);
taosArrayRemove
(
pFS
->
aDFileSet
,
iOld
);
iNew
++
;
continue
;
}
return
code
;
_err:
tsdbError
(
"vgId:%d, tsdb fs upd/del failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbFSCommit1
(
STsdb
*
pTsdb
,
STsdbFS
*
pFSNew
)
{
int32_t
code
=
0
;
char
tfname
[
TSDB_FILENAME_LEN
];
char
fname
[
TSDB_FILENAME_LEN
];
snprintf
(
tfname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sCURRENT.t"
,
tfsGetPrimaryPath
(
pTsdb
->
pVnode
->
pTfs
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
);
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sCURRENT"
,
tfsGetPrimaryPath
(
pTsdb
->
pVnode
->
pTfs
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
);
// gnrt CURRENT.t
code
=
tsdbGnrtCurrent
(
pTsdb
,
pFSNew
,
tfname
);
if
(
code
)
goto
_err
;
// rename
code
=
taosRenameFile
(
tfname
,
fname
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_err
;
}
return
code
;
_err:
tsdbError
(
"vgId:%d, tsdb fs commit phase 1 failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbFSCommit2
(
STsdb
*
pTsdb
,
STsdbFS
*
pFSNew
)
{
int32_t
code
=
0
;
int32_t
nRef
;
char
fname
[
TSDB_FILENAME_LEN
];
// del
if
(
pFSNew
->
pDelFile
)
{
SDelFile
*
pDelFile
=
pTsdb
->
fs
.
pDelFile
;
if
(
pDelFile
==
NULL
||
(
pDelFile
->
commitID
!=
pFSNew
->
pDelFile
->
commitID
))
{
pTsdb
->
fs
.
pDelFile
=
(
SDelFile
*
)
taosMemoryMalloc
(
sizeof
(
SDelFile
));
if
(
pTsdb
->
fs
.
pDelFile
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
pTsdb
->
fs
.
pDelFile
=
*
pFSNew
->
pDelFile
;
pTsdb
->
fs
.
pDelFile
->
nRef
=
1
;
if
(
pDelFile
)
{
nRef
=
atomic_sub_fetch_32
(
&
pDelFile
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbDelFileName
(
pTsdb
,
pDelFile
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
pDelFile
);
}
}
}
}
else
{
ASSERT
(
pTsdb
->
fs
.
pDelFile
==
NULL
);
}
// data
int32_t
iOld
=
0
;
int32_t
iNew
=
0
;
while
(
true
)
{
int32_t
nOld
=
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
);
int32_t
nNew
=
taosArrayGetSize
(
pFSNew
->
aDFileSet
);
SDFileSet
fSet
;
int8_t
sameDisk
;
if
(
iOld
>=
nOld
&&
iNew
>=
nNew
)
break
;
SDFileSet
*
pSetOld
=
(
iOld
<
nOld
)
?
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
iOld
)
:
NULL
;
SDFileSet
*
pSetNew
=
(
iNew
<
nNew
)
?
taosArrayGet
(
pFSNew
->
aDFileSet
,
iNew
)
:
NULL
;
if
(
pSetOld
&&
pSetNew
)
{
if
(
pSetOld
->
fid
==
pSetNew
->
fid
)
{
goto
_merge_old_and_new
;
}
else
if
(
pSetOld
->
fid
<
pSetNew
->
fid
)
{
goto
_remove_old
;
}
else
{
goto
_add_new
;
}
}
else
if
(
pSetOld
)
{
goto
_remove_old
;
}
else
{
goto
_add_new
;
}
_merge_old_and_new:
sameDisk
=
((
pSetOld
->
diskId
.
level
==
pSetNew
->
diskId
.
level
)
&&
(
pSetOld
->
diskId
.
id
==
pSetNew
->
diskId
.
id
));
// head
fSet
.
pHeadF
=
pSetOld
->
pHeadF
;
if
((
!
sameDisk
)
||
(
pSetOld
->
pHeadF
->
commitID
!=
pSetNew
->
pHeadF
->
commitID
))
{
pSetOld
->
pHeadF
=
(
SHeadFile
*
)
taosMemoryMalloc
(
sizeof
(
SHeadFile
));
if
(
pSetOld
->
pHeadF
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
pSetOld
->
pHeadF
=
*
pSetNew
->
pHeadF
;
pSetOld
->
pHeadF
->
nRef
=
1
;
nRef
=
atomic_sub_fetch_32
(
&
fSet
.
pHeadF
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbHeadFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
fSet
.
pHeadF
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
fSet
.
pHeadF
);
}
}
else
{
ASSERT
(
fSet
.
pHeadF
->
size
==
pSetNew
->
pHeadF
->
size
);
ASSERT
(
fSet
.
pHeadF
->
offset
==
pSetNew
->
pHeadF
->
offset
);
}
// data
fSet
.
pDataF
=
pSetOld
->
pDataF
;
if
((
!
sameDisk
)
||
(
pSetOld
->
pDataF
->
commitID
!=
pSetNew
->
pDataF
->
commitID
))
{
pSetOld
->
pDataF
=
(
SDataFile
*
)
taosMemoryMalloc
(
sizeof
(
SDataFile
));
if
(
pSetOld
->
pDataF
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
pSetOld
->
pDataF
=
*
pSetNew
->
pDataF
;
pSetOld
->
pDataF
->
nRef
=
1
;
nRef
=
atomic_sub_fetch_32
(
&
fSet
.
pDataF
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbDataFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
fSet
.
pDataF
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
fSet
.
pDataF
);
}
}
else
{
ASSERT
(
pSetOld
->
pDataF
->
size
<=
pSetNew
->
pDataF
->
size
);
pSetOld
->
pDataF
->
size
=
pSetNew
->
pDataF
->
size
;
}
// sma
fSet
.
pSmaF
=
pSetOld
->
pSmaF
;
if
((
!
sameDisk
)
||
(
pSetOld
->
pSmaF
->
commitID
!=
pSetNew
->
pSmaF
->
commitID
))
{
pSetOld
->
pSmaF
=
(
SSmaFile
*
)
taosMemoryMalloc
(
sizeof
(
SSmaFile
));
if
(
pSetOld
->
pSmaF
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
pSetOld
->
pSmaF
=
*
pSetNew
->
pSmaF
;
pSetOld
->
pSmaF
->
nRef
=
1
;
nRef
=
atomic_sub_fetch_32
(
&
fSet
.
pSmaF
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbSmaFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
fSet
.
pSmaF
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
fSet
.
pSmaF
);
}
}
else
{
ASSERT
(
pSetOld
->
pSmaF
->
size
<=
pSetNew
->
pSmaF
->
size
);
pSetOld
->
pSmaF
->
size
=
pSetNew
->
pSmaF
->
size
;
}
// stt
if
(
sameDisk
)
{
if
(
pSetNew
->
nSttF
>
pSetOld
->
nSttF
)
{
ASSERT
(
pSetNew
->
nSttF
=
pSetOld
->
nSttF
+
1
);
pSetOld
->
aSttF
[
pSetOld
->
nSttF
]
=
(
SSttFile
*
)
taosMemoryMalloc
(
sizeof
(
SSttFile
));
if
(
pSetOld
->
aSttF
[
pSetOld
->
nSttF
]
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
pSetOld
->
aSttF
[
pSetOld
->
nSttF
]
=
*
pSetNew
->
aSttF
[
pSetOld
->
nSttF
];
pSetOld
->
aSttF
[
pSetOld
->
nSttF
]
->
nRef
=
1
;
pSetOld
->
nSttF
++
;
}
else
if
(
pSetNew
->
nSttF
<
pSetOld
->
nSttF
)
{
ASSERT
(
pSetNew
->
nSttF
==
1
);
for
(
int32_t
iStt
=
0
;
iStt
<
pSetOld
->
nSttF
;
iStt
++
)
{
SSttFile
*
pSttFile
=
pSetOld
->
aSttF
[
iStt
];
nRef
=
atomic_sub_fetch_32
(
&
pSttFile
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbSttFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
pSttFile
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
pSttFile
);
}
pSetOld
->
aSttF
[
iStt
]
=
NULL
;
}
pSetOld
->
nSttF
=
1
;
pSetOld
->
aSttF
[
0
]
=
(
SSttFile
*
)
taosMemoryMalloc
(
sizeof
(
SSttFile
));
if
(
pSetOld
->
aSttF
[
0
]
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
pSetOld
->
aSttF
[
0
]
=
*
pSetNew
->
aSttF
[
0
];
pSetOld
->
aSttF
[
0
]
->
nRef
=
1
;
}
else
{
for
(
int32_t
iStt
=
0
;
iStt
<
pSetOld
->
nSttF
;
iStt
++
)
{
if
(
pSetOld
->
aSttF
[
iStt
]
->
commitID
!=
pSetNew
->
aSttF
[
iStt
]
->
commitID
)
{
SSttFile
*
pSttFile
=
pSetOld
->
aSttF
[
iStt
];
nRef
=
atomic_sub_fetch_32
(
&
pSttFile
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbSttFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
pSttFile
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
pSttFile
);
}
pSetOld
->
aSttF
[
iStt
]
=
(
SSttFile
*
)
taosMemoryMalloc
(
sizeof
(
SSttFile
));
if
(
pSetOld
->
aSttF
[
iStt
]
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
pSetOld
->
aSttF
[
iStt
]
=
*
pSetNew
->
aSttF
[
iStt
];
pSetOld
->
aSttF
[
iStt
]
->
nRef
=
1
;
}
else
{
ASSERT
(
pSetOld
->
aSttF
[
iStt
]
->
size
==
pSetNew
->
aSttF
[
iStt
]
->
size
);
ASSERT
(
pSetOld
->
aSttF
[
iStt
]
->
offset
==
pSetNew
->
aSttF
[
iStt
]
->
offset
);
}
}
}
}
else
{
ASSERT
(
pSetOld
->
nSttF
==
pSetNew
->
nSttF
);
for
(
int32_t
iStt
=
0
;
iStt
<
pSetOld
->
nSttF
;
iStt
++
)
{
SSttFile
*
pSttFile
=
pSetOld
->
aSttF
[
iStt
];
nRef
=
atomic_sub_fetch_32
(
&
pSttFile
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbSttFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
pSttFile
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
pSttFile
);
}
pSetOld
->
aSttF
[
iStt
]
=
(
SSttFile
*
)
taosMemoryMalloc
(
sizeof
(
SSttFile
));
if
(
pSetOld
->
aSttF
[
iStt
]
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
pSetOld
->
aSttF
[
iStt
]
=
*
pSetNew
->
aSttF
[
iStt
];
pSetOld
->
aSttF
[
iStt
]
->
nRef
=
1
;
}
}
if
(
!
sameDisk
)
{
pSetOld
->
diskId
=
pSetNew
->
diskId
;
}
iOld
++
;
iNew
++
;
continue
;
_remove_old:
nRef
=
atomic_sub_fetch_32
(
&
pSetOld
->
pHeadF
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbHeadFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
pSetOld
->
pHeadF
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
pSetOld
->
pHeadF
);
}
nRef
=
atomic_sub_fetch_32
(
&
pSetOld
->
pDataF
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbDataFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
pSetOld
->
pDataF
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
pSetOld
->
pDataF
);
}
nRef
=
atomic_sub_fetch_32
(
&
pSetOld
->
pSmaF
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbSmaFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
pSetOld
->
pSmaF
,
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
pSetOld
->
pSmaF
);
}
for
(
int8_t
iStt
=
0
;
iStt
<
pSetOld
->
nSttF
;
iStt
++
)
{
nRef
=
atomic_sub_fetch_32
(
&
pSetOld
->
aSttF
[
iStt
]
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbSttFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
pSetOld
->
aSttF
[
iStt
],
fname
);
taosRemoveFile
(
fname
);
taosMemoryFree
(
pSetOld
->
aSttF
[
iStt
]);
}
}
taosArrayRemove
(
pTsdb
->
fs
.
aDFileSet
,
iOld
);
continue
;
_add_new:
fSet
=
(
SDFileSet
){.
diskId
=
pSetNew
->
diskId
,
.
fid
=
pSetNew
->
fid
,
.
nSttF
=
1
};
// head
fSet
.
pHeadF
=
(
SHeadFile
*
)
taosMemoryMalloc
(
sizeof
(
SHeadFile
));
if
(
fSet
.
pHeadF
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
fSet
.
pHeadF
=
*
pSetNew
->
pHeadF
;
fSet
.
pHeadF
->
nRef
=
1
;
// data
fSet
.
pDataF
=
(
SDataFile
*
)
taosMemoryMalloc
(
sizeof
(
SDataFile
));
if
(
fSet
.
pDataF
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
fSet
.
pDataF
=
*
pSetNew
->
pDataF
;
fSet
.
pDataF
->
nRef
=
1
;
// sma
fSet
.
pSmaF
=
(
SSmaFile
*
)
taosMemoryMalloc
(
sizeof
(
SSmaFile
));
if
(
fSet
.
pSmaF
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
fSet
.
pSmaF
=
*
pSetNew
->
pSmaF
;
fSet
.
pSmaF
->
nRef
=
1
;
// stt
ASSERT
(
pSetNew
->
nSttF
==
1
);
fSet
.
aSttF
[
0
]
=
(
SSttFile
*
)
taosMemoryMalloc
(
sizeof
(
SSttFile
));
if
(
fSet
.
aSttF
[
0
]
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
fSet
.
aSttF
[
0
]
=
*
pSetNew
->
aSttF
[
0
];
fSet
.
aSttF
[
0
]
->
nRef
=
1
;
if
(
taosArrayInsert
(
pTsdb
->
fs
.
aDFileSet
,
iOld
,
&
fSet
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
iOld
++
;
iNew
++
;
continue
;
}
return
code
;
_err:
tsdbError
(
"vgId:%d, tsdb fs commit phase 2 failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
#endif
int32_t
tsdbFSRef
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
)
{
int32_t
code
=
0
;
int32_t
nRef
;
...
...
source/dnode/vnode/src/tsdb/tsdbOpen.c
浏览文件 @
b8ea469a
...
...
@@ -71,6 +71,9 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
goto
_err
;
}
pTsdb
->
trimHdl
.
maxRetentFid
=
INT32_MIN
;
pTsdb
->
trimHdl
.
minCommitFid
=
INT32_MAX
;
tsdbDebug
(
"vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d"
,
TD_VID
(
pVnode
),
pTsdb
->
path
,
pTsdb
->
keepCfg
.
days
,
pTsdb
->
keepCfg
.
keep0
,
pTsdb
->
keepCfg
.
keep1
,
pTsdb
->
keepCfg
.
keep2
);
...
...
source/dnode/vnode/src/tsdb/tsdbRetention2.c
浏览文件 @
b8ea469a
...
...
@@ -15,16 +15,18 @@
#include "tsdb.h"
static
bool
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
if
(
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
)
==
0
)
{
return
false
;
}
enum
{
RETENTION_NO
=
0
,
RETENTION_EXPIRED
=
1
,
RETENTION_MIGRATE
=
2
};
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
0
);
if
(
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
)
<
0
)
{
return
true
;
}
#define MIGRATE_MIN_FSIZE (1048576 << 9) // 512 MB
#define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB
#define MIGRATE_MIN_COST (5) // second
static
bool
tsdbShouldDoMigrate
(
STsdb
*
pTsdb
);
static
int32_t
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
);
static
int32_t
tsdbProcessExpire
(
STsdb
*
pTsdb
,
int64_t
now
,
int32_t
retention
);
static
int32_t
tsdbProcessMigrate
(
STsdb
*
pTsdb
,
int64_t
now
,
int32_t
maxSpeed
,
int32_t
retention
);
static
bool
tsdbShouldDoMigrate
(
STsdb
*
pTsdb
)
{
if
(
tfsGetLevel
(
pTsdb
->
pVnode
->
pTfs
)
<
2
)
{
return
false
;
}
...
...
@@ -33,67 +35,203 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
if
(
keepCfg
->
keep0
==
keepCfg
->
keep1
&&
keepCfg
->
keep1
==
keepCfg
->
keep2
)
{
return
false
;
}
return
true
;
}
static
int32_t
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
int32_t
retention
=
RETENTION_NO
;
if
(
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
)
==
0
)
{
return
retention
;
}
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
0
);
if
(
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
)
<
0
)
{
retention
|=
RETENTION_EXPIRED
;
}
if
(
!
tsdbShouldDoMigrate
(
pTsdb
))
{
return
retention
;
}
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
);
iSet
++
)
{
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
);
++
iSet
)
{
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
keepCfg
,
now
);
SDiskID
did
;
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
if
(
expLevel
==
pSet
->
diskId
.
level
)
continue
;
if
(
expLevel
>
0
)
{
retention
|=
RETENTION_MIGRATE
;
break
;
}
}
return
retention
;
}
static
int32_t
tsdbProcessExpire
(
STsdb
*
pTsdb
,
int64_t
now
,
int32_t
retention
)
{
int32_t
code
=
0
;
int32_t
nLoops
=
0
;
int32_t
maxFid
=
INT32_MIN
;
STsdbFS
fs
=
{
0
};
STsdbFS
fsLatest
=
{
0
};
if
(
!
(
retention
&
RETENTION_EXPIRED
))
{
goto
_exit
;
}
code
=
tsdbFSCopy
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_exit
;
int32_t
fsSize
=
taosArrayGetSize
(
fs
.
aDFileSet
);
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
iSet
++
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
expLevel
<
0
)
{
return
true
;
SET_DFSET_EXPIRED
(
pSet
);
if
(
pSet
->
fid
>
maxFid
)
maxFid
=
pSet
->
fid
;
}
else
{
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
return
false
;
}
break
;
}
}
if
(
did
.
level
==
pSet
->
diskId
.
level
)
continue
;
if
(
maxFid
==
INT32_MIN
)
goto
_exit
;
return
true
;
_wait_commit_end:
while
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
)
<=
maxFid
)
{
if
(
++
nLoops
>
1000
)
{
nLoops
=
0
;
sched_yield
();
printf
(
"%s:%d sche_yield() minCommitFid:%d maxFid:%d
\n
"
,
__func__
,
__LINE__
,
pTsdb
->
trimHdl
.
minCommitFid
,
maxFid
);
}
}
if
(
atomic_val_compare_exchange_8
(
&
pTsdb
->
trimHdl
.
state
,
0
,
1
)
==
0
)
{
if
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
)
<=
maxFid
)
{
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
goto
_wait_commit_end
;
}
atomic_store_32
(
&
pTsdb
->
trimHdl
.
maxRetentFid
,
maxFid
);
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
}
else
{
goto
_wait_commit_end
;
}
return
false
;
_merge_fs:
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
if
((
code
=
tsdbFSCopy
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 1) merge tsdbFSNew and pTsdb->fs
if
((
code
=
tsdbFSUpdDel
(
pTsdb
,
&
fsLatest
,
&
fs
,
maxFid
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 2) save CURRENT
if
((
code
=
tsdbFSCommit1
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 3) apply the tsdbFS to pTsdb->fs
if
((
code
=
tsdbFSCommit2
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
_exit:
tsdbFSDestroy
(
&
fs
);
tsdbFSDestroy
(
&
fsLatest
);
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, tsdb do retention(expire) failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
ASSERT
(
0
);
}
return
code
;
}
/**
* @brief Data migration between multi-tier storage, including remove expired data.
* 1) firstly, remove expired DFileSet;
* 2) partition the tsdbFS by the expLevel and fileSize(e.g. 500G, configurable), and migrate DFileSet groups between multi-tier storage;
* 3) update the tsdbFS and CURRENT in the same transaction;
* 4) finish
* @brief
*
* @param pTsdb
* @param now
* @param retention
* @return int32_t
*/
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
static
int32_t
tsdbProcessMigrate
(
STsdb
*
pTsdb
,
int64_t
now
,
int32_t
maxSpeed
,
int32_t
retention
)
{
int32_t
code
=
0
;
int32_t
nLoops
=
0
;
int32_t
maxFid
=
INT32_MIN
;
int64_t
fSize
=
0
;
STsdbFS
fs
=
{
0
};
STsdbFS
fsLatest
=
{
0
};
if
(
!
tsdbShouldDoRetention
(
pTsdb
,
now
))
{
return
code
;
if
(
!
(
retention
&
RETENTION_MIGRATE
))
{
goto
_exit
;
}
// do retention
STsdbFS
fs
;
_migrate_loop:
// reset
maxFid
=
INT32_MIN
;
fSize
=
0
;
code
=
tsdbFSCopy
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_exit
;
int32_t
fsSize
=
taosArrayGetSize
(
fs
.
aDFileSet
);
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
++
iSet
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
expLevel
>
0
)
{
ASSERT
(
pSet
->
fid
>
maxFid
);
maxFid
=
pSet
->
fid
;
fSize
+=
(
pSet
->
pDataF
->
size
+
pSet
->
pHeadF
->
size
+
pSet
->
pSmaF
->
size
);
if
(
fSize
/
MIGRATE_MAX_SPEED
>
MIGRATE_MIN_COST
)
{
break
;
}
for
(
int32_t
iStt
=
0
;
iStt
<
pSet
->
nSttF
;
++
iStt
)
{
fSize
+=
pSet
->
aSttF
[
iStt
]
->
size
;
}
if
(
fSize
/
MIGRATE_MAX_SPEED
>
MIGRATE_MIN_COST
)
{
break
;
}
}
}
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
fs
.
aDFileSet
);
iSet
++
)
{
if
(
maxFid
==
INT32_MIN
)
goto
_exit
;
_wait_commit_end:
while
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
)
<=
maxFid
)
{
if
(
++
nLoops
>
1000
)
{
nLoops
=
0
;
sched_yield
();
printf
(
"%s:%d sche_yield()
\n
"
,
__func__
,
__LINE__
);
}
}
if
(
atomic_val_compare_exchange_8
(
&
pTsdb
->
trimHdl
.
state
,
0
,
1
)
==
0
)
{
if
(
atomic_load_32
(
&
pTsdb
->
trimHdl
.
minCommitFid
)
<=
maxFid
)
{
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
goto
_wait_commit_end
;
}
atomic_store_32
(
&
pTsdb
->
trimHdl
.
maxRetentFid
,
maxFid
);
atomic_store_8
(
&
pTsdb
->
trimHdl
.
state
,
0
);
}
else
{
goto
_wait_commit_end
;
}
// migrate
for
(
int32_t
iSet
=
0
;
iSet
<
fsSize
;
++
iSet
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
pSet
->
fid
>
maxFid
)
break
;
if
(
expLevel
<
0
)
{
taosMemoryFree
(
pSet
->
pHeadF
);
taosMemoryFree
(
pSet
->
pDataF
);
taosMemoryFree
(
pSet
->
aSttF
[
0
]);
taosMemoryFree
(
pSet
->
pSmaF
);
taosArrayRemove
(
fs
.
aDFileSet
,
iSet
);
iSet
--
;
}
else
{
if
(
expLevel
==
0
)
continue
;
SET_DFSET_EXPIRED
(
pSet
);
}
else
if
(
expLevel
>
0
)
{
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
code
=
terrno
;
goto
_exit
;
...
...
@@ -101,40 +239,87 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if
(
did
.
level
==
pSet
->
diskId
.
level
)
continue
;
// copy file to new disk
(todo)
// copy file to new disk
SDFileSet
fSet
=
*
pSet
;
fSet
.
diskId
=
did
;
code
=
tsdbDFileSetCopy
(
pTsdb
,
pSet
,
&
fSet
);
if
(
code
)
goto
_e
rr
;
if
(
code
)
goto
_e
xit
;
code
=
tsdbFSUpsertFSet
(
&
fs
,
&
fSet
);
if
(
code
)
goto
_e
rr
;
if
(
code
)
goto
_e
xit
;
}
}
// do change fs
code
=
tsdbFSCommit1
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_err
;
_merge_fs:
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
code
=
tsdbFSCommit2
(
pTsdb
,
&
fs
);
if
(
code
)
{
if
((
code
=
tsdbFSCopy
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
goto
_exit
;
}
// 1) merge tsdbFSNew and pTsdb->fs
if
((
code
=
tsdbFSUpdDel
(
pTsdb
,
&
fsLatest
,
&
fs
,
maxFid
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 2) save CURRENT
if
((
code
=
tsdbFSCommit1
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// 3) apply the tsdbFS to pTsdb->fs
if
((
code
=
tsdbFSCommit2
(
pTsdb
,
&
fsLatest
)))
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbFSDestroy
(
&
fs
);
_exit:
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, tsdb do retention(migrate) failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
ASSERT
(
0
);
}
return
code
;
}
_err:
tsdbError
(
"vgId:%d, tsdb do retention failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
ASSERT
(
0
);
// tsdbFSRollback(pTsdb->pFS);
/**
* @brief Data migration between multi-tier storage, including remove expired data.
* 1) firstly, remove expired DFileSet;
* 2) partition the tsdbFS by the expLevel and the estimated cost(e.g. 5s) to copy, and migrate
* DFileSet groups between multi-tier storage;
* 3) update the tsdbFS and CURRENT in the same transaction;
* 4) finish the migration
* @param pTsdb
* @param now
* @param maxSpeed
* @return int32_t
*/
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
,
int32_t
maxSpeed
)
{
int32_t
code
=
0
;
int32_t
retention
=
RETENTION_NO
;
retention
=
tsdbShouldDoRetention
(
pTsdb
,
now
);
if
(
retention
==
RETENTION_NO
)
{
goto
_exit
;
}
// step 1: process expire
code
=
tsdbProcessExpire
(
pTsdb
,
now
,
retention
);
if
(
code
<
0
)
goto
_exit
;
// step 2: process multi-tier migration
code
=
tsdbProcessMigrate
(
pTsdb
,
now
,
maxSpeed
,
retention
);
if
(
code
<
0
)
goto
_exit
;
_exit:
pTsdb
->
trimHdl
.
maxRetentFid
=
INT32_MIN
;
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, tsdb do retention failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
ASSERT
(
0
);
// tsdbFSRollback(pTsdb->pFS);
}
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
b8ea469a
...
...
@@ -417,24 +417,29 @@ void *vnodeProcessTrimReqFunc(void *param) {
setThreadName
(
"vnode-trim"
);
// process
code
=
tsdbDoRetention
(
pVnode
->
pTsdb
,
pReq
->
trimReq
.
timestamp
);
code
=
tsdbDoRetention
(
pVnode
->
pTsdb
,
pReq
->
trimReq
.
timestamp
,
pReq
->
trimReq
.
maxSpeed
);
if
(
code
)
goto
_exit
;
code
=
smaDoRetention
(
pVnode
->
pSma
,
pReq
->
trimReq
.
timestamp
);
code
=
smaDoRetention
(
pVnode
->
pSma
,
pReq
->
trimReq
.
timestamp
,
pReq
->
trimReq
.
maxSpeed
);
if
(
code
)
goto
_exit
;
_exit:
vInfo
(
"vgId:%d, trim vnode thread finished, time:%d"
,
TD_VID
(
pVnode
),
pReq
->
trimReq
.
timestamp
);
oldVal
=
atomic_val_compare_exchange_8
(
&
pVnode
->
trimDbH
.
state
,
1
,
0
);
ASSERT
(
oldVal
==
1
);
taosMemoryFree
(
pReq
);
if
(
code
)
{
vError
(
"vgId:%d, trim vnode thread failed since %s, time:%"
PRIi64
,
TD_VID
(
pVnode
),
tstrerror
(
code
),
pReq
->
trimReq
.
timestamp
);
}
else
{
vInfo
(
"vgId:%d, trim vnode thread finished, time:%"
PRIi64
,
TD_VID
(
pVnode
),
pReq
->
trimReq
.
timestamp
);
}
return
NULL
;
}
static
int32_t
vnodeProcessTrimReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
int32_t
code
=
0
;
SVndTrimDbReq
*
pVndTrimReq
=
taosMemoryMalloc
(
sizeof
(
SVndTrimDbReq
));
S
TrimDbHandle
*
pHandle
=
&
pVnode
->
trimDbH
;
S
VTrimDbHdl
*
pHandle
=
&
pVnode
->
trimDbH
;
if
(
!
pVndTrimReq
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -450,7 +455,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
}
if
(
atomic_val_compare_exchange_8
(
&
pHandle
->
state
,
0
,
1
)
!=
0
)
{
vInfo
(
"vgId:%d, trim vnode request will not be processed since duplicated req, time:%
d"
,
TD_VID
(
pVnode
),
vInfo
(
"vgId:%d, trim vnode request will not be processed since duplicated req, time:%
"
PRIi64
,
TD_VID
(
pVnode
),
pVndTrimReq
->
trimReq
.
timestamp
);
taosMemoryFree
(
pVndTrimReq
);
goto
_exit
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录