Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0c7a4bfa
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0c7a4bfa
编写于
2月 20, 2023
作者:
B
Benguang Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: refactor syncBeginSnapshot and walBeginSnapshot for logRetention
上级
16bc8cb5
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
45 addition
and
88 deletion
+45
-88
include/libs/wal/wal.h
include/libs/wal/wal.h
+2
-1
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+23
-71
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+16
-12
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+3
-3
未找到文件。
include/libs/wal/wal.h
浏览文件 @
0c7a4bfa
...
...
@@ -66,6 +66,7 @@ typedef struct {
int64_t
commitVer
;
int64_t
appliedVer
;
int64_t
lastVer
;
int64_t
logRetention
;
}
SWalVer
;
#pragma pack(push, 1)
...
...
@@ -180,7 +181,7 @@ void walFsync(SWal *, bool force);
int32_t
walCommit
(
SWal
*
,
int64_t
ver
);
int32_t
walRollback
(
SWal
*
,
int64_t
ver
);
// notify that previous logs can be pruned safely
int32_t
walBeginSnapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walBeginSnapshot
(
SWal
*
,
int64_t
ver
,
int64_t
logRetention
);
int32_t
walEndSnapshot
(
SWal
*
);
int32_t
walRestoreFromSnapshot
(
SWal
*
,
int64_t
ver
);
// for tq
...
...
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
0c7a4bfa
...
...
@@ -472,7 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
taosThreadMutexLock
(
&
pSdb
->
filelock
);
if
(
pSdb
->
pWal
!=
NULL
)
{
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex
, 0
);
if
(
pSdb
->
sync
==
0
)
{
code
=
0
;
}
else
{
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
0c7a4bfa
...
...
@@ -270,86 +270,38 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
return
-
1
;
}
SyncIndex
beginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
endIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
bool
isEmpty
=
pSyncNode
->
pLogStore
->
syncLogIsEmpty
(
pSyncNode
->
pLogStore
);
if
(
isEmpty
||
!
(
lastApplyIndex
>=
beginIndex
&&
lastApplyIndex
<=
endIndex
))
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
", empty:%d, do not delete wal"
,
lastApplyIndex
,
isEmpty
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
int32_t
code
=
0
;
int64_t
logRetention
=
0
;
if
(
syncNodeIsMnode
(
pSyncNode
))
{
// mnode
int64_t
logRetention
=
SYNC_MNODE_LOG_RETENTION
;
SyncIndex
beginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
endIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
int64_t
logNum
=
endIndex
-
beginIndex
;
bool
isEmpty
=
pSyncNode
->
pLogStore
->
syncLogIsEmpty
(
pSyncNode
->
pLogStore
);
if
(
isEmpty
||
(
!
isEmpty
&&
logNum
<
logRetention
))
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
", log-num:%"
PRId64
", empty:%d, do not delete wal"
,
lastApplyIndex
,
logNum
,
isEmpty
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
goto
_DEL_WAL
;
logRetention
=
SYNC_MNODE_LOG_RETENTION
;
}
else
{
SyncIndex
beginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
endIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
bool
isEmpty
=
pSyncNode
->
pLogStore
->
syncLogIsEmpty
(
pSyncNode
->
pLogStore
);
if
(
isEmpty
||
!
(
lastApplyIndex
>=
beginIndex
&&
lastApplyIndex
<=
endIndex
))
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
", empty:%d, do not delete wal"
,
lastApplyIndex
,
isEmpty
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
// vnode
if
(
pSyncNode
->
replicaNum
>
1
)
{
// multi replicas
logRetention
=
SYNC_VNODE_LOG_RETENTION
;
}
}
lastApplyIndex
=
TMAX
(
lastApplyIndex
-
SYNC_VNODE_LOG_RETENTION
,
beginIndex
-
1
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
pSyncNode
->
minMatchIndex
=
syncMinMatchIndex
(
pSyncNode
);
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
int64_t
matchIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pMatchIndex
,
&
(
pSyncNode
->
peersId
[
i
]));
if
(
lastApplyIndex
>
matchIndex
)
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" is greater than match-index:%"
PRId64
" of dnode:%d, do not delete wal"
,
lastApplyIndex
,
matchIndex
,
DID
(
&
pSyncNode
->
peersId
[
i
]));
syncNodeRelease
(
pSyncNode
);
return
0
;
}
}
}
else
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
if
(
lastApplyIndex
>
pSyncNode
->
minMatchIndex
)
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" is greater than min-match-index:%"
PRId64
", do not delete wal"
,
lastApplyIndex
,
pSyncNode
->
minMatchIndex
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
}
else
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" candidate, do not delete wal"
,
lastApplyIndex
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
else
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" unknown state, do not delete wal"
,
lastApplyIndex
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
goto
_DEL_WAL
;
}
else
{
// one replica
goto
_DEL_WAL
;
if
(
pSyncNode
->
replicaNum
>
1
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
&&
pSyncNode
->
state
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" candidate or unknown state, do not delete wal"
,
lastApplyIndex
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
logRetention
=
TMAX
(
logRetention
,
lastApplyIndex
-
pSyncNode
->
minMatchIndex
);
}
_DEL_WAL:
...
...
@@ -366,7 +318,7 @@ _DEL_WAL:
atomic_store_64
(
&
pSyncNode
->
snapshottingIndex
,
lastApplyIndex
);
pSyncNode
->
snapshottingTime
=
taosGetTimestampMs
();
code
=
walBeginSnapshot
(
pData
->
pWal
,
lastApplyIndex
);
code
=
walBeginSnapshot
(
pData
->
pWal
,
lastApplyIndex
,
logRetention
);
if
(
code
==
0
)
{
sNTrace
(
pSyncNode
,
"wal snapshot begin, index:%"
PRId64
", last apply index:%"
PRId64
,
pSyncNode
->
snapshottingIndex
,
lastApplyIndex
);
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
0c7a4bfa
...
...
@@ -247,21 +247,23 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
return
0
;
}
int32_t
walBeginSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
int32_t
walBeginSnapshot
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
logRetention
)
{
taosThreadMutexLock
(
&
pWal
->
mutex
);
ASSERT
(
logRetention
>=
0
);
pWal
->
vers
.
verInSnapshotting
=
ver
;
wDebug
(
"vgId:%d, wal begin snapshot for version %"
PRId64
", first ver %"
PRId64
", last ver %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
pWal
->
vers
.
logRetention
=
logRetention
;
wDebug
(
"vgId:%d, wal begin snapshot for version %"
PRId64
", log retention %"
PRId64
" first ver %"
PRId64
", last ver %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
logRetention
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
// check file rolling
if
(
pWal
->
cfg
.
retentionPeriod
==
0
)
{
if
(
walGetLastFileSize
(
pWal
)
!=
0
)
{
if
(
walRollImpl
(
pWal
)
<
0
)
{
wError
(
"vgId:%d, failed to roll wal files since %s"
,
pWal
->
cfg
.
vgId
,
terrstr
());
goto
_err
;
}
if
(
walGetLastFileSize
(
pWal
)
!=
0
)
{
if
(
walRollImpl
(
pWal
)
<
0
)
{
wError
(
"vgId:%d, failed to roll wal files since %s"
,
pWal
->
cfg
.
vgId
,
terrstr
());
goto
_err
;
}
}
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
0
;
...
...
@@ -275,8 +277,9 @@ int32_t walEndSnapshot(SWal *pWal) {
taosThreadMutexLock
(
&
pWal
->
mutex
);
int64_t
ver
=
pWal
->
vers
.
verInSnapshotting
;
wDebug
(
"vgId:%d, wal end snapshot for version %"
PRId64
", first ver %"
PRId64
", last ver %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
wDebug
(
"vgId:%d, wal end snapshot for version %"
PRId64
", log retention %"
PRId64
" first ver %"
PRId64
", last ver %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
logRetention
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
if
(
ver
==
-
1
)
{
code
=
-
1
;
...
...
@@ -286,6 +289,7 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal
->
vers
.
snapshotVer
=
ver
;
int
ts
=
taosGetTimestampSec
();
ver
=
TMAX
(
ver
-
pWal
->
vers
.
logRetention
,
pWal
->
vers
.
firstVer
-
1
);
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pWal
->
pRefHash
,
pIter
);
...
...
source/libs/wal/test/walMetaTest.cpp
浏览文件 @
0c7a4bfa
...
...
@@ -264,7 +264,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) {
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
i
);
if
(
i
==
5
)
{
walBeginSnapshot
(
pWal
,
i
);
walBeginSnapshot
(
pWal
,
i
,
0
);
walEndSnapshot
(
pWal
);
}
}
...
...
@@ -301,7 +301,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ
(
pWal
->
vers
.
commitVer
,
i
);
}
walBeginSnapshot
(
pWal
,
i
-
1
);
walBeginSnapshot
(
pWal
,
i
-
1
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
verInSnapshotting
,
i
-
1
);
walEndSnapshot
(
pWal
);
ASSERT_EQ
(
pWal
->
vers
.
snapshotVer
,
i
-
1
);
...
...
@@ -317,7 +317,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ
(
pWal
->
vers
.
commitVer
,
i
);
}
code
=
walBeginSnapshot
(
pWal
,
i
-
1
);
code
=
walBeginSnapshot
(
pWal
,
i
-
1
,
0
);
ASSERT_EQ
(
code
,
0
);
code
=
walEndSnapshot
(
pWal
);
ASSERT_EQ
(
code
,
0
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录