Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
68aa58da
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
270
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
68aa58da
编写于
1月 21, 2019
作者:
C
chenhoudao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
recover unit test file
上级
5595fe39
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
298 addition
and
0 deletion
+298
-0
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
...va/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+298
-0
未找到文件。
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
0 → 100644
浏览文件 @
68aa58da
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.store
;
import
java.io.File
;
import
java.io.RandomAccessFile
;
import
java.net.InetAddress
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.nio.MappedByteBuffer
;
import
java.nio.channels.FileChannel
;
import
java.nio.channels.OverlappingFileLockException
;
import
java.util.Map
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.store.config.FlushDiskType
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.config.StorePathConfigHelper
;
import
org.junit.After
;
import
org.apache.rocketmq.store.stats.BrokerStatsManager
;
import
org.junit.Before
;
import
org.junit.Test
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
public
class
DefaultMessageStoreTest
{
private
final
String
StoreMessage
=
"Once, there was a chance for me!"
;
private
int
QUEUE_TOTAL
=
100
;
private
AtomicInteger
QueueId
=
new
AtomicInteger
(
0
);
private
SocketAddress
BornHost
;
private
SocketAddress
StoreHost
;
private
byte
[]
MessageBody
;
private
MessageStore
messageStore
;
@Before
public
void
init
()
throws
Exception
{
StoreHost
=
new
InetSocketAddress
(
InetAddress
.
getLocalHost
(),
8123
);
BornHost
=
new
InetSocketAddress
(
InetAddress
.
getByName
(
"127.0.0.1"
),
0
);
messageStore
=
buildMessageStore
();
boolean
load
=
messageStore
.
load
();
assertTrue
(
load
);
messageStore
.
start
();
}
@Test
(
expected
=
OverlappingFileLockException
.
class
)
public
void
test_repate_restart
()
throws
Exception
{
QUEUE_TOTAL
=
1
;
MessageBody
=
StoreMessage
.
getBytes
();
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
8
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
4
);
messageStoreConfig
.
setMaxHashSlotNum
(
100
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
10
);
MessageStore
master
=
new
DefaultMessageStore
(
messageStoreConfig
,
null
,
new
MyMessageArrivingListener
(),
new
BrokerConfig
());
boolean
load
=
master
.
load
();
assertTrue
(
load
);
try
{
master
.
start
();
master
.
start
();
}
finally
{
master
.
shutdown
();
master
.
destroy
();
}
}
@After
public
void
destory
()
{
messageStore
.
shutdown
();
messageStore
.
destroy
();
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
File
file
=
new
File
(
messageStoreConfig
.
getStorePathRootDir
());
UtilAll
.
deleteFile
(
file
);
}
private
MessageStore
buildMessageStore
()
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMaxHashSlotNum
(
10000
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
100
);
messageStoreConfig
.
setFlushDiskType
(
FlushDiskType
.
SYNC_FLUSH
);
messageStoreConfig
.
setFlushIntervalConsumeQueue
(
1
);
return
new
DefaultMessageStore
(
messageStoreConfig
,
new
BrokerStatsManager
(
"simpleTest"
),
new
MyMessageArrivingListener
(),
new
BrokerConfig
());
}
@Test
public
void
testWriteAndRead
()
{
long
totalMsgs
=
10
;
QUEUE_TOTAL
=
1
;
MessageBody
=
StoreMessage
.
getBytes
();
for
(
long
i
=
0
;
i
<
totalMsgs
;
i
++)
{
messageStore
.
putMessage
(
buildMessage
());
}
for
(
long
i
=
0
;
i
<
totalMsgs
;
i
++)
{
GetMessageResult
result
=
messageStore
.
getMessage
(
"GROUP_A"
,
"TOPIC_A"
,
0
,
i
,
1024
*
1024
,
null
);
assertThat
(
result
).
isNotNull
();
result
.
release
();
}
verifyThatMasterIsFunctional
(
totalMsgs
,
messageStore
);
}
private
MessageExtBrokerInner
buildMessage
()
{
MessageExtBrokerInner
msg
=
new
MessageExtBrokerInner
();
msg
.
setTopic
(
"FooBar"
);
msg
.
setTags
(
"TAG1"
);
msg
.
setKeys
(
"Hello"
);
msg
.
setBody
(
MessageBody
);
msg
.
setKeys
(
String
.
valueOf
(
System
.
currentTimeMillis
()));
msg
.
setQueueId
(
Math
.
abs
(
QueueId
.
getAndIncrement
())
%
QUEUE_TOTAL
);
msg
.
setSysFlag
(
0
);
msg
.
setBornTimestamp
(
System
.
currentTimeMillis
());
msg
.
setStoreHost
(
StoreHost
);
msg
.
setBornHost
(
BornHost
);
return
msg
;
}
private
void
verifyThatMasterIsFunctional
(
long
totalMsgs
,
MessageStore
master
)
{
for
(
long
i
=
0
;
i
<
totalMsgs
;
i
++)
{
master
.
putMessage
(
buildMessage
());
}
for
(
long
i
=
0
;
i
<
totalMsgs
;
i
++)
{
GetMessageResult
result
=
master
.
getMessage
(
"GROUP_A"
,
"TOPIC_A"
,
0
,
i
,
1024
*
1024
,
null
);
assertThat
(
result
).
isNotNull
();
result
.
release
();
}
}
@Test
public
void
testPullSize
()
throws
Exception
{
String
topic
=
"pullSizeTopic"
;
for
(
int
i
=
0
;
i
<
32
;
i
++)
{
MessageExtBrokerInner
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
// wait for consume queue build
// the sleep time should be great than consume queue flush interval
Thread
.
sleep
(
100
);
String
group
=
"simple"
;
GetMessageResult
getMessageResult32
=
messageStore
.
getMessage
(
group
,
topic
,
0
,
0
,
32
,
null
);
assertThat
(
getMessageResult32
.
getMessageBufferList
().
size
()).
isEqualTo
(
32
);
GetMessageResult
getMessageResult20
=
messageStore
.
getMessage
(
group
,
topic
,
0
,
0
,
20
,
null
);
assertThat
(
getMessageResult20
.
getMessageBufferList
().
size
()).
isEqualTo
(
20
);
GetMessageResult
getMessageResult45
=
messageStore
.
getMessage
(
group
,
topic
,
0
,
0
,
10
,
null
);
assertThat
(
getMessageResult45
.
getMessageBufferList
().
size
()).
isEqualTo
(
10
);
}
@Test
public
void
testRecover
()
throws
Exception
{
String
topic
=
"recoverTopic"
;
MessageBody
=
StoreMessage
.
getBytes
();
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
MessageExtBrokerInner
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
Thread
.
sleep
(
100
);
//wait for build consumer queue
long
maxPhyOffset
=
messageStore
.
getMaxPhyOffset
();
long
maxCqOffset
=
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
);
//1.just reboot
messageStore
.
shutdown
();
messageStore
=
buildMessageStore
();
boolean
load
=
messageStore
.
load
();
assertTrue
(
load
);
messageStore
.
start
();
assertTrue
(
maxPhyOffset
==
messageStore
.
getMaxPhyOffset
());
assertTrue
(
maxCqOffset
==
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
));
//2.damage commitlog and reboot normal
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
MessageExtBrokerInner
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
Thread
.
sleep
(
100
);
long
secondLastPhyOffset
=
messageStore
.
getMaxPhyOffset
();
long
secondLastCqOffset
=
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
);
MessageExtBrokerInner
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
messageStore
.
shutdown
();
//damage last message
damageCommitlog
(
secondLastPhyOffset
);
//reboot
messageStore
=
buildMessageStore
();
load
=
messageStore
.
load
();
assertTrue
(
load
);
messageStore
.
start
();
assertTrue
(
secondLastPhyOffset
==
messageStore
.
getMaxPhyOffset
());
assertTrue
(
secondLastCqOffset
==
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
));
//3.damage commitlog and reboot abnormal
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
Thread
.
sleep
(
100
);
secondLastPhyOffset
=
messageStore
.
getMaxPhyOffset
();
secondLastCqOffset
=
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
);
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
messageStore
.
shutdown
();
//damage last message
damageCommitlog
(
secondLastPhyOffset
);
//add abort file
String
fileName
=
StorePathConfigHelper
.
getAbortFile
(((
DefaultMessageStore
)
messageStore
).
getMessageStoreConfig
().
getStorePathRootDir
());
File
file
=
new
File
(
fileName
);
MappedFile
.
ensureDirOK
(
file
.
getParent
());
file
.
createNewFile
();
messageStore
=
buildMessageStore
();
load
=
messageStore
.
load
();
assertTrue
(
load
);
messageStore
.
start
();
assertTrue
(
secondLastPhyOffset
==
messageStore
.
getMaxPhyOffset
());
assertTrue
(
secondLastCqOffset
==
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
));
//message write again
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
}
private
void
damageCommitlog
(
long
offset
)
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
File
file
=
new
File
(
messageStoreConfig
.
getStorePathCommitLog
()
+
File
.
separator
+
"00000000000000000000"
);
FileChannel
fileChannel
=
new
RandomAccessFile
(
file
,
"rw"
).
getChannel
();
MappedByteBuffer
mappedByteBuffer
=
fileChannel
.
map
(
FileChannel
.
MapMode
.
READ_WRITE
,
0
,
1024
*
1024
*
10
);
int
bodyLen
=
mappedByteBuffer
.
getInt
((
int
)
offset
+
84
);
int
topicLenIndex
=
(
int
)
offset
+
84
+
bodyLen
+
4
;
mappedByteBuffer
.
position
(
topicLenIndex
);
mappedByteBuffer
.
putInt
(
0
);
mappedByteBuffer
.
putInt
(
0
);
mappedByteBuffer
.
putInt
(
0
);
mappedByteBuffer
.
putInt
(
0
);
mappedByteBuffer
.
force
();
fileChannel
.
force
(
true
);
fileChannel
.
close
();
}
private
class
MyMessageArrivingListener
implements
MessageArrivingListener
{
@Override
public
void
arriving
(
String
topic
,
int
queueId
,
long
logicOffset
,
long
tagsCode
,
long
msgStoreTime
,
byte
[]
filterBitMap
,
Map
<
String
,
String
>
properties
)
{
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录