Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
189c6e30
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
267
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
189c6e30
编写于
3月 27, 2020
作者:
R
rongtong
提交者:
GitHub
3月 27, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1817 from rushsky518/deleteFiles_unit_test
Add delete IndexFile unit test
上级
549f72f6
fe724b30
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
67 addition
and
4 deletion
+67
-4
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
...che/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+67
-4
未找到文件。
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
浏览文件 @
189c6e30
...
@@ -19,8 +19,12 @@ package org.apache.rocketmq.store;
...
@@ -19,8 +19,12 @@ package org.apache.rocketmq.store;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.store.config.FlushDiskType
;
import
org.apache.rocketmq.store.config.FlushDiskType
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.index.IndexFile
;
import
org.apache.rocketmq.store.index.IndexService
;
import
org.apache.rocketmq.store.stats.BrokerStatsManager
;
import
org.apache.rocketmq.store.stats.BrokerStatsManager
;
import
org.junit.After
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Before
;
...
@@ -31,7 +35,9 @@ import java.lang.reflect.Field;
...
@@ -31,7 +35,9 @@ import java.lang.reflect.Field;
import
java.net.InetAddress
;
import
java.net.InetAddress
;
import
java.net.InetSocketAddress
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.net.SocketAddress
;
import
java.util.ArrayList
;
import
java.util.Calendar
;
import
java.util.Calendar
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Map
;
import
java.util.UUID
;
import
java.util.UUID
;
...
@@ -52,6 +58,7 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -52,6 +58,7 @@ public class DefaultMessageStoreCleanFilesTest {
private
SocketAddress
storeHost
;
private
SocketAddress
storeHost
;
private
String
topic
=
"test"
;
private
String
topic
=
"test"
;
private
String
keys
=
"hello"
;
private
int
queueId
=
0
;
private
int
queueId
=
0
;
private
int
fileCountCommitLog
=
55
;
private
int
fileCountCommitLog
=
55
;
// exactly one message per CommitLog file.
// exactly one message per CommitLog file.
...
@@ -87,6 +94,9 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -87,6 +94,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
int
fileCountIndexFile
=
getFileCountIndexFile
();
assertEquals
(
fileCountIndexFile
,
getIndexFileList
().
size
());
int
expireFileCount
=
15
;
int
expireFileCount
=
15
;
expireFiles
(
commitLogQueue
,
expireFileCount
);
expireFiles
(
commitLogQueue
,
expireFileCount
);
...
@@ -101,6 +111,10 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -101,6 +111,10 @@ public class DefaultMessageStoreCleanFilesTest {
int
msgCountPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
msgCountPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
expectDeleteCountConsumeQueue
=
(
int
)
Math
.
floor
((
double
)
expectDeletedCount
/
msgCountPerFile
);
int
expectDeleteCountConsumeQueue
=
(
int
)
Math
.
floor
((
double
)
expectDeletedCount
/
msgCountPerFile
);
assertEquals
(
fileCountConsumeQueue
-
expectDeleteCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
assertEquals
(
fileCountConsumeQueue
-
expectDeleteCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
int
msgCountPerIndexFile
=
getMsgCountPerIndexFile
();
int
expectDeleteCountIndexFile
=
(
int
)
Math
.
floor
((
double
)
expectDeletedCount
/
msgCountPerIndexFile
);
assertEquals
(
fileCountIndexFile
-
expectDeleteCountIndexFile
,
getIndexFileList
().
size
());
}
}
}
}
...
@@ -126,6 +140,9 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -126,6 +140,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
int
fileCountIndexFile
=
getFileCountIndexFile
();
assertEquals
(
fileCountIndexFile
,
getIndexFileList
().
size
());
int
expireFileCount
=
15
;
int
expireFileCount
=
15
;
expireFiles
(
commitLogQueue
,
expireFileCount
);
expireFiles
(
commitLogQueue
,
expireFileCount
);
...
@@ -140,6 +157,10 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -140,6 +157,10 @@ public class DefaultMessageStoreCleanFilesTest {
int
msgCountPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
msgCountPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
expectDeleteCountConsumeQueue
=
(
int
)
Math
.
floor
((
double
)
expectDeletedCount
/
msgCountPerFile
);
int
expectDeleteCountConsumeQueue
=
(
int
)
Math
.
floor
((
double
)
expectDeletedCount
/
msgCountPerFile
);
assertEquals
(
fileCountConsumeQueue
-
expectDeleteCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
assertEquals
(
fileCountConsumeQueue
-
expectDeleteCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
int
msgCountPerIndexFile
=
getMsgCountPerIndexFile
();
int
expectDeleteCountIndexFile
=
(
int
)
Math
.
floor
((
double
)
expectDeletedCount
/
msgCountPerIndexFile
);
assertEquals
(
fileCountIndexFile
-
expectDeleteCountIndexFile
,
getIndexFileList
().
size
());
}
}
}
}
...
@@ -165,6 +186,9 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -165,6 +186,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
int
fileCountIndexFile
=
getFileCountIndexFile
();
assertEquals
(
fileCountIndexFile
,
getIndexFileList
().
size
());
// In this case, there is no need to expire the files.
// In this case, there is no need to expire the files.
// int expireFileCount = 15;
// int expireFileCount = 15;
// expireFiles(commitLogQueue, expireFileCount);
// expireFiles(commitLogQueue, expireFileCount);
...
@@ -181,6 +205,10 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -181,6 +205,10 @@ public class DefaultMessageStoreCleanFilesTest {
int
msgCountPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
msgCountPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
expectDeleteCountConsumeQueue
=
(
int
)
Math
.
floor
((
double
)
(
a
*
10
)
/
msgCountPerFile
);
int
expectDeleteCountConsumeQueue
=
(
int
)
Math
.
floor
((
double
)
(
a
*
10
)
/
msgCountPerFile
);
assertEquals
(
fileCountConsumeQueue
-
expectDeleteCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
assertEquals
(
fileCountConsumeQueue
-
expectDeleteCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
int
msgCountPerIndexFile
=
getMsgCountPerIndexFile
();
int
expectDeleteCountIndexFile
=
(
int
)
Math
.
floor
((
double
)
(
a
*
10
)
/
msgCountPerIndexFile
);
assertEquals
(
fileCountIndexFile
-
expectDeleteCountIndexFile
,
getIndexFileList
().
size
());
}
}
}
}
...
@@ -208,6 +236,9 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -208,6 +236,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
MappedFileQueue
consumeQueue
=
getMappedFileQueueConsumeQueue
();
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
assertEquals
(
fileCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
int
fileCountIndexFile
=
getFileCountIndexFile
();
assertEquals
(
fileCountIndexFile
,
getIndexFileList
().
size
());
int
expireFileCount
=
15
;
int
expireFileCount
=
15
;
expireFiles
(
commitLogQueue
,
expireFileCount
);
expireFiles
(
commitLogQueue
,
expireFileCount
);
...
@@ -222,6 +253,10 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -222,6 +253,10 @@ public class DefaultMessageStoreCleanFilesTest {
int
msgCountPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
msgCountPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
expectDeleteCountConsumeQueue
=
(
int
)
Math
.
floor
((
double
)
expectDeletedCount
/
msgCountPerFile
);
int
expectDeleteCountConsumeQueue
=
(
int
)
Math
.
floor
((
double
)
expectDeletedCount
/
msgCountPerFile
);
assertEquals
(
fileCountConsumeQueue
-
expectDeleteCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
assertEquals
(
fileCountConsumeQueue
-
expectDeleteCountConsumeQueue
,
consumeQueue
.
getMappedFiles
().
size
());
int
msgCountPerIndexFile
=
getMsgCountPerIndexFile
();
int
expectDeleteCountIndexFile
=
(
int
)
Math
.
floor
((
double
)
(
a
*
10
)
/
msgCountPerIndexFile
);
assertEquals
(
fileCountIndexFile
-
expectDeleteCountIndexFile
,
getIndexFileList
().
size
());
}
}
}
}
...
@@ -274,32 +309,60 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -274,32 +309,60 @@ public class DefaultMessageStoreCleanFilesTest {
return
fileQueue
;
return
fileQueue
;
}
}
private
ArrayList
<
IndexFile
>
getIndexFileList
()
throws
Exception
{
Field
indexServiceField
=
messageStore
.
getClass
().
getDeclaredField
(
"indexService"
);
indexServiceField
.
setAccessible
(
true
);
IndexService
indexService
=
(
IndexService
)
indexServiceField
.
get
(
messageStore
);
Field
indexFileListField
=
indexService
.
getClass
().
getDeclaredField
(
"indexFileList"
);
indexFileListField
.
setAccessible
(
true
);
ArrayList
<
IndexFile
>
indexFileList
=
(
ArrayList
<
IndexFile
>)
indexFileListField
.
get
(
indexService
);
return
indexFileList
;
}
private
int
getFileCountConsumeQueue
()
{
private
int
getFileCountConsumeQueue
()
{
int
countPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
int
countPerFile
=
getMsgCountPerConsumeQueueMappedFile
();
double
fileCount
=
(
double
)
msgCount
/
countPerFile
;
double
fileCount
=
(
double
)
msgCount
/
countPerFile
;
return
(
int
)
Math
.
ceil
(
fileCount
);
return
(
int
)
Math
.
ceil
(
fileCount
);
}
}
private
int
getFileCountIndexFile
()
{
int
countPerFile
=
getMsgCountPerIndexFile
();
double
fileCount
=
(
double
)
msgCount
/
countPerFile
;
return
(
int
)
Math
.
ceil
(
fileCount
);
}
private
int
getMsgCountPerConsumeQueueMappedFile
()
{
private
int
getMsgCountPerConsumeQueueMappedFile
()
{
int
size
=
messageStore
.
getMessageStoreConfig
().
getMappedFileSizeConsumeQueue
();
int
size
=
messageStore
.
getMessageStoreConfig
().
getMappedFileSizeConsumeQueue
();
return
size
/
CQ_STORE_UNIT_SIZE
;
// 7 in this case
return
size
/
CQ_STORE_UNIT_SIZE
;
// 7 in this case
}
}
private
int
getMsgCountPerIndexFile
()
{
// 7 in this case
return
messageStore
.
getMessageStoreConfig
().
getMaxIndexNum
()
-
1
;
}
private
void
buildAndPutMessagesToMessageStore
(
int
msgCount
)
throws
Exception
{
private
void
buildAndPutMessagesToMessageStore
(
int
msgCount
)
throws
Exception
{
int
msgLen
=
topic
.
getBytes
(
CHARSET_UTF8
).
length
+
91
;
int
msgLen
=
topic
.
getBytes
(
CHARSET_UTF8
).
length
+
91
;
Map
<
String
,
String
>
properties
=
new
HashMap
<>(
4
);
properties
.
put
(
MessageConst
.
PROPERTY_KEYS
,
keys
);
String
s
=
MessageDecoder
.
messageProperties2String
(
properties
);
int
propertiesLen
=
s
.
getBytes
(
CHARSET_UTF8
).
length
;
int
commitLogEndFileMinBlankLength
=
4
+
4
;
int
commitLogEndFileMinBlankLength
=
4
+
4
;
int
singleMsgBodyLen
=
mappedFileSize
-
msgLen
-
commitLogEndFileMinBlankLength
;
int
singleMsgBodyLen
=
mappedFileSize
-
msgLen
-
propertiesLen
-
commitLogEndFileMinBlankLength
;
for
(
int
i
=
0
;
i
<
msgCount
;
i
++)
{
for
(
int
i
=
0
;
i
<
msgCount
;
i
++)
{
MessageExtBrokerInner
msg
=
new
MessageExtBrokerInner
();
MessageExtBrokerInner
msg
=
new
MessageExtBrokerInner
();
msg
.
setTopic
(
topic
);
msg
.
setTopic
(
topic
);
msg
.
setBody
(
new
byte
[
singleMsgBodyLen
]);
msg
.
setBody
(
new
byte
[
singleMsgBodyLen
]);
msg
.
setKeys
(
String
.
valueOf
(
System
.
currentTimeMillis
())
);
msg
.
setKeys
(
keys
);
msg
.
setQueueId
(
queueId
);
msg
.
setQueueId
(
queueId
);
msg
.
setSysFlag
(
0
);
msg
.
setSysFlag
(
0
);
msg
.
setBornTimestamp
(
System
.
currentTimeMillis
());
msg
.
setBornTimestamp
(
System
.
currentTimeMillis
());
msg
.
setStoreHost
(
storeHost
);
msg
.
setStoreHost
(
storeHost
);
msg
.
setBornHost
(
bornHost
);
msg
.
setBornHost
(
bornHost
);
msg
.
setPropertiesString
(
MessageDecoder
.
messageProperties2String
(
msg
.
getProperties
()));
PutMessageResult
result
=
messageStore
.
putMessage
(
msg
);
PutMessageResult
result
=
messageStore
.
putMessage
(
msg
);
assertTrue
(
result
!=
null
&&
result
.
isOk
());
assertTrue
(
result
!=
null
&&
result
.
isOk
());
}
}
...
@@ -324,8 +387,8 @@ public class DefaultMessageStoreCleanFilesTest {
...
@@ -324,8 +387,8 @@ public class DefaultMessageStoreCleanFilesTest {
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfigForTest
();
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfigForTest
();
messageStoreConfig
.
setMappedFileSizeCommitLog
(
mappedFileSize
);
messageStoreConfig
.
setMappedFileSizeCommitLog
(
mappedFileSize
);
messageStoreConfig
.
setMappedFileSizeConsumeQueue
(
mappedFileSize
);
messageStoreConfig
.
setMappedFileSizeConsumeQueue
(
mappedFileSize
);
messageStoreConfig
.
setMaxHashSlotNum
(
100
00
);
messageStoreConfig
.
setMaxHashSlotNum
(
100
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
100
);
messageStoreConfig
.
setMaxIndexNum
(
8
);
messageStoreConfig
.
setFlushDiskType
(
FlushDiskType
.
SYNC_FLUSH
);
messageStoreConfig
.
setFlushDiskType
(
FlushDiskType
.
SYNC_FLUSH
);
messageStoreConfig
.
setFlushIntervalConsumeQueue
(
1
);
messageStoreConfig
.
setFlushIntervalConsumeQueue
(
1
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录