Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
336599b1
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
336599b1
编写于
5月 25, 2020
作者:
R
rongtong
提交者:
GitHub
5月 25, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1403 from qqeasonchen/dev_bing
[ISSUE #1400]do disk space detection in another thread
上级
a8f77301
9176e057
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
70 addition
and
1 deletion
+70
-1
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+33
-1
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
...che/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+37
-0
未找到文件。
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
336599b1
...
@@ -114,6 +114,9 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -114,6 +114,9 @@ public class DefaultMessageStore implements MessageStore {
boolean
shutDownNormal
=
false
;
boolean
shutDownNormal
=
false
;
private
final
ScheduledExecutorService
diskCheckScheduledExecutorService
=
Executors
.
newSingleThreadScheduledExecutor
(
new
ThreadFactoryImpl
(
"DiskCheckScheduledThread"
));
public
DefaultMessageStore
(
final
MessageStoreConfig
messageStoreConfig
,
final
BrokerStatsManager
brokerStatsManager
,
public
DefaultMessageStore
(
final
MessageStoreConfig
messageStoreConfig
,
final
BrokerStatsManager
brokerStatsManager
,
final
MessageArrivingListener
messageArrivingListener
,
final
BrokerConfig
brokerConfig
)
throws
IOException
{
final
MessageArrivingListener
messageArrivingListener
,
final
BrokerConfig
brokerConfig
)
throws
IOException
{
this
.
messageArrivingListener
=
messageArrivingListener
;
this
.
messageArrivingListener
=
messageArrivingListener
;
...
@@ -293,7 +296,7 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -293,7 +296,7 @@ public class DefaultMessageStore implements MessageStore {
this
.
shutdown
=
true
;
this
.
shutdown
=
true
;
this
.
scheduledExecutorService
.
shutdown
();
this
.
scheduledExecutorService
.
shutdown
();
this
.
diskCheckScheduledExecutorService
.
shutdown
();
try
{
try
{
Thread
.
sleep
(
1000
);
Thread
.
sleep
(
1000
);
...
@@ -1329,6 +1332,11 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -1329,6 +1332,11 @@ public class DefaultMessageStore implements MessageStore {
// DefaultMessageStore.this.cleanExpiredConsumerQueue();
// DefaultMessageStore.this.cleanExpiredConsumerQueue();
// }
// }
// }, 1, 1, TimeUnit.HOURS);
// }, 1, 1, TimeUnit.HOURS);
this
.
diskCheckScheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
public
void
run
()
{
DefaultMessageStore
.
this
.
cleanCommitLogService
.
isSpaceFull
();
}
},
1000L
,
10000L
,
TimeUnit
.
MILLISECONDS
);
}
}
private
void
cleanFilesPeriodically
()
{
private
void
cleanFilesPeriodically
()
{
...
@@ -1727,6 +1735,30 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -1727,6 +1735,30 @@ public class DefaultMessageStore implements MessageStore {
public
void
setManualDeleteFileSeveralTimes
(
int
manualDeleteFileSeveralTimes
)
{
public
void
setManualDeleteFileSeveralTimes
(
int
manualDeleteFileSeveralTimes
)
{
this
.
manualDeleteFileSeveralTimes
=
manualDeleteFileSeveralTimes
;
this
.
manualDeleteFileSeveralTimes
=
manualDeleteFileSeveralTimes
;
}
}
public
boolean
isSpaceFull
()
{
String
storePathPhysic
=
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getStorePathCommitLog
();
double
physicRatio
=
UtilAll
.
getDiskPartitionSpaceUsedPercent
(
storePathPhysic
);
double
ratio
=
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getDiskMaxUsedSpaceRatio
()
/
100.0
;
if
(
physicRatio
>
ratio
)
{
DefaultMessageStore
.
log
.
info
(
"physic disk of commitLog used: "
+
physicRatio
);
}
if
(
physicRatio
>
this
.
diskSpaceWarningLevelRatio
)
{
boolean
diskok
=
DefaultMessageStore
.
this
.
runningFlags
.
getAndMakeDiskFull
();
if
(
diskok
)
{
DefaultMessageStore
.
log
.
error
(
"physic disk of commitLog maybe full soon, used "
+
physicRatio
+
", so mark disk full"
);
}
return
true
;
}
else
{
boolean
diskok
=
DefaultMessageStore
.
this
.
runningFlags
.
getAndMakeDiskOK
();
if
(!
diskok
)
{
DefaultMessageStore
.
log
.
info
(
"physic disk space of commitLog OK "
+
physicRatio
+
", so mark disk ok"
);
}
return
false
;
}
}
}
}
class
CleanConsumeQueueService
{
class
CleanConsumeQueueService
{
...
...
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
浏览文件 @
336599b1
...
@@ -72,6 +72,43 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -72,6 +72,43 @@ public class DefaultMessageStoreCleanFilesTest {
bornHost
=
new
InetSocketAddress
(
InetAddress
.
getByName
(
"127.0.0.1"
),
0
);
bornHost
=
new
InetSocketAddress
(
InetAddress
.
getByName
(
"127.0.0.1"
),
0
);
}
}
@Test
public
void
testIsSpaceFullFunctionEmpty2Full
()
throws
Exception
{
String
deleteWhen
=
"04"
;
// the min value of diskMaxUsedSpaceRatio.
int
diskMaxUsedSpaceRatio
=
1
;
// used to set disk-full flag
double
diskSpaceCleanForciblyRatio
=
0.01
D
;
initMessageStore
(
deleteWhen
,
diskMaxUsedSpaceRatio
,
diskSpaceCleanForciblyRatio
);
// build and put 55 messages, exactly one message per CommitLog file.
buildAndPutMessagesToMessageStore
(
msgCount
);
MappedFileQueue
commitLogQueue
=
getMappedFileQueueCommitLog
();
assertEquals
(
fileCountCommitLog
,
commitLogQueue
.
getMappedFiles
().
size
());
int
fileCountConsumeQueue
=
getFileCountConsumeQueue
();
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
cleanCommitLogService
.
isSpaceFull
();
assertEquals
(
1
<<
4
,
messageStore
.
getRunningFlags
().
getFlagBits
()
&
(
1
<<
4
));
messageStore
.
shutdown
();
messageStore
.
destroy
();
}
@Test
public
void
testIsSpaceFullFunctionFull2Empty
()
throws
Exception
{
String
deleteWhen
=
"04"
;
// the min value of diskMaxUsedSpaceRatio.
int
diskMaxUsedSpaceRatio
=
1
;
//use to reset disk-full flag
double
diskSpaceCleanForciblyRatio
=
0.999
D
;
initMessageStore
(
deleteWhen
,
diskMaxUsedSpaceRatio
,
diskSpaceCleanForciblyRatio
);
//set disk full
messageStore
.
getRunningFlags
().
getAndMakeDiskFull
();
cleanCommitLogService
.
isSpaceFull
();
assertEquals
(
0
,
messageStore
.
getRunningFlags
().
getFlagBits
()
&
(
1
<<
4
));
}
@Test
@Test
public
void
testDeleteExpiredFilesByTimeUp
()
throws
Exception
{
public
void
testDeleteExpiredFilesByTimeUp
()
throws
Exception
{
String
deleteWhen
=
Calendar
.
getInstance
().
get
(
Calendar
.
HOUR_OF_DAY
)
+
""
;
String
deleteWhen
=
Calendar
.
getInstance
().
get
(
Calendar
.
HOUR_OF_DAY
)
+
""
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录