Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
1c2cbfa0
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
1c2cbfa0
编写于
12月 20, 2018
作者:
D
dongeforever
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Refresh the message store config to dledger config
上级
5bf113a1
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
25 addition
and
18 deletion
+25
-18
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
...a/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+19
-16
store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java
...a/org/apache/rocketmq/store/dledger/MixCommitlogTest.java
+6
-2
未找到文件。
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
浏览文件 @
1c2cbfa0
...
...
@@ -70,11 +70,6 @@ public class DLedgerCommitLog extends CommitLog {
private
long
dividedCommitlogOffset
=
-
1
;
//The old commitlog should be deleted before the dledger commitlog
private
final
boolean
originalDledgerEnableForceClean
;
private
final
AtomicBoolean
hasSetOriginalDledgerEnableForceClean
=
new
AtomicBoolean
(
false
);
private
boolean
isInrecoveringOldCommitlog
=
false
;
public
DLedgerCommitLog
(
final
DefaultMessageStore
defaultMessageStore
)
{
...
...
@@ -87,7 +82,7 @@ public class DLedgerCommitLog extends CommitLog {
dLedgerConfig
.
setStoreBaseDir
(
defaultMessageStore
.
getMessageStoreConfig
().
getStorePathRootDir
());
dLedgerConfig
.
setMappedFileSizeForEntryData
(
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
());
dLedgerConfig
.
setDeleteWhen
(
defaultMessageStore
.
getMessageStoreConfig
().
getDeleteWhen
());
originalDledgerEnableForceClean
=
dLedgerConfig
.
isEnableDiskForceClean
(
);
dLedgerConfig
.
setFileReservedHours
(
defaultMessageStore
.
getMessageStoreConfig
().
getFileReservedTime
()
+
1
);
id
=
Integer
.
valueOf
(
dLedgerConfig
.
getSelfId
().
substring
(
1
))
+
1
;
dLedgerServer
=
new
DLedgerServer
(
dLedgerConfig
);
dLedgerFileStore
=
(
DLedgerMmapFileStore
)
dLedgerServer
.
getdLedgerStore
();
...
...
@@ -112,6 +107,17 @@ public class DLedgerCommitLog extends CommitLog {
return
true
;
}
private
void
refreshConfig
()
{
dLedgerConfig
.
setEnableDiskForceClean
(
defaultMessageStore
.
getMessageStoreConfig
().
isCleanFileForciblyEnable
());
dLedgerConfig
.
setDeleteWhen
(
defaultMessageStore
.
getMessageStoreConfig
().
getDeleteWhen
());
dLedgerConfig
.
setFileReservedHours
(
defaultMessageStore
.
getMessageStoreConfig
().
getFileReservedTime
()
+
1
);
}
private
void
disableDeleteDledger
()
{
dLedgerConfig
.
setEnableDiskForceClean
(
false
);
dLedgerConfig
.
setFileReservedHours
(
Integer
.
MAX_VALUE
);
}
@Override
public
void
start
()
{
dLedgerServer
.
startup
();
...
...
@@ -177,11 +183,11 @@ public class DLedgerCommitLog extends CommitLog {
final
boolean
cleanImmediately
)
{
if
(
mappedFileQueue
.
getMappedFiles
().
isEmpty
())
{
if
(
hasSetOriginalDledgerEnableForceClean
.
compareAndSet
(
false
,
true
))
{
dLedgerConfig
.
setEnableDiskForceClean
(
originalDledgerEnableForceClean
);
}
refreshConfig
();
//To prevent too much log in defaultMessageStore
return
Integer
.
MAX_VALUE
;
}
else
{
disableDeleteDledger
();
}
int
count
=
super
.
deleteExpiredFile
(
expiredTime
,
deleteFilesInterval
,
intervalForcibly
,
cleanImmediately
);
if
(
count
>
0
||
mappedFileQueue
.
getMappedFiles
().
size
()
!=
1
)
{
...
...
@@ -251,16 +257,14 @@ public class DLedgerCommitLog extends CommitLog {
return
null
;
}
private
void
recover
(
long
maxPhyOffsetOfConsumeQueue
,
boolean
lastOk
)
{
private
void
recover
(
long
maxPhyOffsetOfConsumeQueue
)
{
dLedgerFileStore
.
load
();
if
(
dLedgerFileList
.
getMappedFiles
().
size
()
>
0
)
{
dLedgerFileStore
.
recover
();
dividedCommitlogOffset
=
dLedgerFileList
.
getFirstMappedFile
().
getFileFromOffset
();
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
if
(
mappedFile
!=
null
)
{
dLedgerConfig
.
setEnableDiskForceClean
(
false
);
}
else
{
hasSetOriginalDledgerEnableForceClean
.
set
(
true
);
disableDeleteDledger
();
}
long
maxPhyOffset
=
dLedgerFileList
.
getMaxWrotePosition
();
// Clear ConsumeQueue redundant data
...
...
@@ -277,7 +281,6 @@ public class DLedgerCommitLog extends CommitLog {
isInrecoveringOldCommitlog
=
false
;
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
if
(
mappedFile
==
null
)
{
hasSetOriginalDledgerEnableForceClean
.
set
(
true
);
return
;
}
ByteBuffer
byteBuffer
=
mappedFile
.
sliceByteBuffer
();
...
...
@@ -309,12 +312,12 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public
void
recoverNormally
(
long
maxPhyOffsetOfConsumeQueue
)
{
recover
(
maxPhyOffsetOfConsumeQueue
,
true
);
recover
(
maxPhyOffsetOfConsumeQueue
);
}
@Override
public
void
recoverAbnormally
(
long
maxPhyOffsetOfConsumeQueue
)
{
recover
(
maxPhyOffsetOfConsumeQueue
,
false
);
recover
(
maxPhyOffsetOfConsumeQueue
);
}
@Override
...
...
store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java
浏览文件 @
1c2cbfa0
...
...
@@ -133,6 +133,7 @@ public class MixCommitlogTest extends MessageStoreTestBase {
{
DefaultMessageStore
dledgerStore
=
createDledgerMessageStore
(
base
,
group
,
"n0"
,
peers
,
null
,
true
,
0
);
DLedgerCommitLog
dLedgerCommitLog
=
(
DLedgerCommitLog
)
dledgerStore
.
getCommitLog
();
Assert
.
assertTrue
(
dledgerStore
.
getMessageStoreConfig
().
isCleanFileForciblyEnable
());
Assert
.
assertFalse
(
dLedgerCommitLog
.
getdLedgerServer
().
getdLedgerConfig
().
isEnableDiskForceClean
());
Assert
.
assertEquals
(
dividedOffset
,
dLedgerCommitLog
.
getDividedCommitlogOffset
());
Thread
.
sleep
(
2000
);
...
...
@@ -143,6 +144,7 @@ public class MixCommitlogTest extends MessageStoreTestBase {
Assert
.
assertEquals
(
0
,
dledgerStore
.
dispatchBehindBytes
());
Assert
.
assertEquals
(
0
,
dledgerStore
.
getMinPhyOffset
());
maxPhysicalOffset
=
dledgerStore
.
getMaxPhyOffset
();
Assert
.
assertTrue
(
maxPhysicalOffset
>
0
);
doGetMessages
(
dledgerStore
,
topic
,
0
,
2000
,
0
);
...
...
@@ -156,13 +158,15 @@ public class MixCommitlogTest extends MessageStoreTestBase {
}
Assert
.
assertEquals
(
dividedOffset
,
dledgerStore
.
getMinPhyOffset
());
Assert
.
assertEquals
(
maxPhysicalOffset
,
dledgerStore
.
getMaxPhyOffset
());
Assert
.
assertTrue
(
dledgerStore
.
getMessageStoreConfig
().
isCleanFileForciblyEnable
());
Assert
.
assertTrue
(
dLedgerCommitLog
.
getdLedgerServer
().
getdLedgerConfig
().
isEnableDiskForceClean
());
dLedgerCommitLog
.
getdLedgerServer
().
getdLedgerConfig
().
setEnableDiskForceClean
(
false
);
//Test fresh
dledgerStore
.
getMessageStoreConfig
().
setCleanFileForciblyEnable
(
false
);
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
Assert
.
assertEquals
(
Integer
.
MAX_VALUE
,
dledgerStore
.
getCommitLog
().
deleteExpiredFile
(
System
.
currentTimeMillis
(),
0
,
0
,
true
));
}
//should not change the value
Assert
.
assertFalse
(
dLedgerCommitLog
.
getdLedgerServer
().
getdLedgerConfig
().
isEnableDiskForceClean
());
doGetMessages
(
dledgerStore
,
topic
,
0
,
1000
,
1000
);
dledgerStore
.
shutdown
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录