Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
小五666\n哈哈
Rocketmq
提交
6e7df557
R
Rocketmq
项目概览
小五666\n哈哈
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
6e7df557
编写于
7月 12, 2019
作者:
V
vongosling
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Typo fix and some other polish
上级
d4c2608f
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
57 addition
and
74 deletion
+57
-74
broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
...org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+0
-4
broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
...he/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+2
-2
docs/cn/best_practice.md
docs/cn/best_practice.md
+1
-1
docs/en/Operations_Broker.md
docs/en/Operations_Broker.md
+1
-1
docs/en/best_practice.md
docs/en/best_practice.md
+1
-1
store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
.../org/apache/rocketmq/store/AllocateMappedFileService.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+4
-4
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+2
-2
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
...e/src/main/java/org/apache/rocketmq/store/MappedFile.java
+3
-6
store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
...a/org/apache/rocketmq/store/SelectMappedBufferResult.java
+0
-4
store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
...in/java/org/apache/rocketmq/store/TransientStorePool.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
.../org/apache/rocketmq/store/config/MessageStoreConfig.java
+10
-10
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
...a/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+2
-2
store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
.../main/java/org/apache/rocketmq/store/ha/HAConnection.java
+1
-1
store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
...st/java/org/apache/rocketmq/store/AppendCallbackTest.java
+2
-2
store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
...t/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+2
-2
store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
...test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+2
-2
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
...che/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+3
-3
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
...pache/rocketmq/store/DefaultMessageStoreShutDownTest.java
+2
-2
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
...va/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+4
-5
store/src/test/java/org/apache/rocketmq/store/HATest.java
store/src/test/java/org/apache/rocketmq/store/HATest.java
+2
-6
store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
...g/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+4
-4
store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java
...a/org/apache/rocketmq/store/dledger/MixCommitlogTest.java
+3
-3
store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
...e/rocketmq/store/schedule/ScheduleMessageServiceTest.java
+2
-2
test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
...va/org/apache/rocketmq/test/base/IntegrationTestBase.java
+1
-1
test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
...ocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
+1
-2
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
浏览文件 @
6e7df557
...
...
@@ -28,10 +28,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import
org.apache.rocketmq.remoting.netty.RequestTask
;
import
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode
;
/**
* BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and
* {@link BrokerController#pullThreadPoolQueue}
*/
public
class
BrokerFastFailure
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
final
ScheduledExecutorService
scheduledExecutorService
=
Executors
.
newSingleThreadScheduledExecutor
(
new
ThreadFactoryImpl
(
...
...
broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
浏览文件 @
6e7df557
...
...
@@ -122,8 +122,8 @@ public class MessageStoreWithFilterTest {
public
MessageStoreConfig
buildStoreConfig
(
int
commitLogFileSize
,
int
cqFileSize
,
boolean
enableCqExt
,
int
cqExtFileSize
)
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
commitLogFileSize
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
cqFileSize
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
commitLogFileSize
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
cqFileSize
);
messageStoreConfig
.
setMappedFileSizeConsumeQueueExt
(
cqExtFileSize
);
messageStoreConfig
.
setMessageIndexEnable
(
false
);
messageStoreConfig
.
setEnableConsumeQueueExt
(
enableCqExt
);
...
...
docs/cn/best_practice.md
浏览文件 @
6e7df557
...
...
@@ -188,7 +188,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
| brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
| storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
| storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
| mapedFileSizeCommitLog | 1024
* 1024 *
1024(1G) | commit log 的映射文件大小 |
| map
p
edFileSizeCommitLog | 1024
* 1024 *
1024(1G) | commit log 的映射文件大小 |
| deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
| fileReservedTime | 72 | 以小时计算的文件保留时间 |
| brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
...
...
docs/en/Operations_Broker.md
浏览文件 @
6e7df557
...
...
@@ -16,7 +16,7 @@ ASYNC_FLUSH is recommended, for SYNC_FLUSH is expensive and will cause too much
| brokerId | 0 | broker id, 0 means master, positive integers mean slave |
| storePathCommitLog | $HOME/store/commitlog/ | file path for commit log |
| storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue |
| mapedFileSizeCommitLog | 1024
* 1024 *
1024(1G) | mapped file size for commit log |
| map
p
edFileSizeCommitLog | 1024
* 1024 *
1024(1G) | mapped file size for commit log |
| deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |
| fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |
| brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
...
...
docs/en/best_practice.md
浏览文件 @
6e7df557
...
...
@@ -22,7 +22,7 @@
| brokerId | 0 | broker id, 0 means master, positive integers mean slave |
| storePathCommitLog | $HOME/store/commitlog/ | file path for commit log |
| storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue |
| mapedFileSizeCommitLog | 1024
* 1024 *
1024(1G) | mapped file size for commit log |
| map
p
edFileSizeCommitLog | 1024
* 1024 *
1024(1G) | mapped file size for commit log |
| deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |
| fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |
| brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
...
...
store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
浏览文件 @
6e7df557
...
...
@@ -185,7 +185,7 @@ public class AllocateMappedFileService extends ServiceThread {
// pre write mappedFile
if
(
mappedFile
.
getFileSize
()
>=
this
.
messageStore
.
getMessageStoreConfig
()
.
getMapedFileSizeCommitLog
()
.
getMap
p
edFileSizeCommitLog
()
&&
this
.
messageStore
.
getMessageStoreConfig
().
isWarmMapedFileEnable
())
{
mappedFile
.
warmMappedFile
(
this
.
messageStore
.
getMessageStoreConfig
().
getFlushDiskType
(),
...
...
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
6e7df557
...
...
@@ -65,7 +65,7 @@ public class CommitLog {
public
CommitLog
(
final
DefaultMessageStore
defaultMessageStore
)
{
this
.
mappedFileQueue
=
new
MappedFileQueue
(
defaultMessageStore
.
getMessageStoreConfig
().
getStorePathCommitLog
(),
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
(),
defaultMessageStore
.
getAllocateMappedFileService
());
defaultMessageStore
.
getMessageStoreConfig
().
getMap
p
edFileSizeCommitLog
(),
defaultMessageStore
.
getAllocateMappedFileService
());
this
.
defaultMessageStore
=
defaultMessageStore
;
if
(
FlushDiskType
.
SYNC_FLUSH
==
defaultMessageStore
.
getMessageStoreConfig
().
getFlushDiskType
())
{
...
...
@@ -144,7 +144,7 @@ public class CommitLog {
}
public
SelectMappedBufferResult
getData
(
final
long
offset
,
final
boolean
returnFirstOnNotFound
)
{
int
mappedFileSize
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
();
int
mappedFileSize
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getMap
p
edFileSizeCommitLog
();
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
findMappedFileByOffset
(
offset
,
returnFirstOnNotFound
);
if
(
mappedFile
!=
null
)
{
int
pos
=
(
int
)
(
offset
%
mappedFileSize
);
...
...
@@ -828,7 +828,7 @@ public class CommitLog {
}
public
SelectMappedBufferResult
getMessage
(
final
long
offset
,
final
int
size
)
{
int
mappedFileSize
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
();
int
mappedFileSize
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getMap
p
edFileSizeCommitLog
();
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
findMappedFileByOffset
(
offset
,
offset
==
0
);
if
(
mappedFile
!=
null
)
{
int
pos
=
(
int
)
(
offset
%
mappedFileSize
);
...
...
@@ -838,7 +838,7 @@ public class CommitLog {
}
public
long
rollNextFile
(
final
long
offset
)
{
int
mappedFileSize
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
();
int
mappedFileSize
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getMap
p
edFileSizeCommitLog
();
return
offset
+
mappedFileSize
-
offset
%
mappedFileSize
;
}
...
...
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
6e7df557
...
...
@@ -1144,7 +1144,7 @@ public class DefaultMessageStore implements MessageStore {
topic
,
queueId
,
StorePathConfigHelper
.
getStorePathConsumeQueue
(
this
.
messageStoreConfig
.
getStorePathRootDir
()),
this
.
getMessageStoreConfig
().
getMapedFileSizeConsumeQueue
(),
this
.
getMessageStoreConfig
().
getMap
p
edFileSizeConsumeQueue
(),
this
);
ConsumeQueue
oldLogic
=
map
.
putIfAbsent
(
queueId
,
newLogic
);
if
(
oldLogic
!=
null
)
{
...
...
@@ -1309,7 +1309,7 @@ public class DefaultMessageStore implements MessageStore {
topic
,
queueId
,
StorePathConfigHelper
.
getStorePathConsumeQueue
(
this
.
messageStoreConfig
.
getStorePathRootDir
()),
this
.
getMessageStoreConfig
().
getMapedFileSizeConsumeQueue
(),
this
.
getMessageStoreConfig
().
getMap
p
edFileSizeConsumeQueue
(),
this
);
this
.
putConsumeQueue
(
topic
,
queueId
,
logic
);
if
(!
logic
.
load
())
{
...
...
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
浏览文件 @
6e7df557
...
...
@@ -49,7 +49,6 @@ public class MappedFile extends ReferenceResource {
private
static
final
AtomicInteger
TOTAL_MAPPED_FILES
=
new
AtomicInteger
(
0
);
protected
final
AtomicInteger
wrotePosition
=
new
AtomicInteger
(
0
);
//ADD BY ChenYang
protected
final
AtomicInteger
committedPosition
=
new
AtomicInteger
(
0
);
private
final
AtomicInteger
flushedPosition
=
new
AtomicInteger
(
0
);
protected
int
fileSize
;
...
...
@@ -119,7 +118,6 @@ public class MappedFile extends ReferenceResource {
private
static
ByteBuffer
viewed
(
ByteBuffer
buffer
)
{
String
methodName
=
"viewedBuffer"
;
Method
[]
methods
=
buffer
.
getClass
().
getMethods
();
for
(
int
i
=
0
;
i
<
methods
.
length
;
i
++)
{
if
(
methods
[
i
].
getName
().
equals
(
"attachment"
))
{
...
...
@@ -166,10 +164,10 @@ public class MappedFile extends ReferenceResource {
TOTAL_MAPPED_FILES
.
incrementAndGet
();
ok
=
true
;
}
catch
(
FileNotFoundException
e
)
{
log
.
error
(
"
create file channel "
+
this
.
fileName
+
" Failed. "
,
e
);
log
.
error
(
"
Failed to create file "
+
this
.
fileName
,
e
);
throw
e
;
}
catch
(
IOException
e
)
{
log
.
error
(
"
map file "
+
this
.
fileName
+
" Failed. "
,
e
);
log
.
error
(
"
Failed to map file "
+
this
.
fileName
,
e
);
throw
e
;
}
finally
{
if
(!
ok
&&
this
.
fileChannel
!=
null
)
{
...
...
@@ -207,7 +205,7 @@ public class MappedFile extends ReferenceResource {
if
(
currentPos
<
this
.
fileSize
)
{
ByteBuffer
byteBuffer
=
writeBuffer
!=
null
?
writeBuffer
.
slice
()
:
this
.
mappedByteBuffer
.
slice
();
byteBuffer
.
position
(
currentPos
);
AppendMessageResult
result
=
null
;
AppendMessageResult
result
;
if
(
messageExt
instanceof
MessageExtBrokerInner
)
{
result
=
cb
.
doAppend
(
this
.
getFileFromOffset
(),
byteBuffer
,
this
.
fileSize
-
currentPos
,
(
MessageExtBrokerInner
)
messageExt
);
}
else
if
(
messageExt
instanceof
MessageExtBatch
)
{
...
...
@@ -382,7 +380,6 @@ public class MappedFile extends ReferenceResource {
public
SelectMappedBufferResult
selectMappedBuffer
(
int
pos
,
int
size
)
{
int
readPosition
=
getReadPosition
();
if
((
pos
+
size
)
<=
readPosition
)
{
if
(
this
.
hold
())
{
ByteBuffer
byteBuffer
=
this
.
mappedByteBuffer
.
slice
();
byteBuffer
.
position
(
pos
);
...
...
store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
浏览文件 @
6e7df557
...
...
@@ -48,10 +48,6 @@ public class SelectMappedBufferResult {
this
.
byteBuffer
.
limit
(
this
.
size
);
}
/* public MappedFile getMappedFile() {
return mappedFile;
}*/
// @Override
// protected void finalize() {
// if (this.mappedFile != null) {
...
...
store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
浏览文件 @
6e7df557
...
...
@@ -39,7 +39,7 @@ public class TransientStorePool {
public
TransientStorePool
(
final
MessageStoreConfig
storeConfig
)
{
this
.
storeConfig
=
storeConfig
;
this
.
poolSize
=
storeConfig
.
getTransientStorePoolSize
();
this
.
fileSize
=
storeConfig
.
getMapedFileSizeCommitLog
();
this
.
fileSize
=
storeConfig
.
getMap
p
edFileSizeCommitLog
();
this
.
availableBuffers
=
new
ConcurrentLinkedDeque
<>();
}
...
...
store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
浏览文件 @
6e7df557
...
...
@@ -31,9 +31,9 @@ public class MessageStoreConfig {
+
File
.
separator
+
"commitlog"
;
// CommitLog file size,default is 1G
private
int
mapedFileSizeCommitLog
=
1024
*
1024
*
1024
;
private
int
map
p
edFileSizeCommitLog
=
1024
*
1024
*
1024
;
// ConsumeQueue file size,default is 30W
private
int
mapedFileSizeConsumeQueue
=
300000
*
ConsumeQueue
.
CQ_STORE_UNIT_SIZE
;
private
int
map
p
edFileSizeConsumeQueue
=
300000
*
ConsumeQueue
.
CQ_STORE_UNIT_SIZE
;
// enable consume queue ext
private
boolean
enableConsumeQueueExt
=
false
;
// ConsumeQueue extend file size, 48M
...
...
@@ -188,22 +188,22 @@ public class MessageStoreConfig {
this
.
warmMapedFileEnable
=
warmMapedFileEnable
;
}
public
int
getMapedFileSizeCommitLog
()
{
return
mapedFileSizeCommitLog
;
public
int
getMap
p
edFileSizeCommitLog
()
{
return
map
p
edFileSizeCommitLog
;
}
public
void
setMap
edFileSizeCommitLog
(
int
ma
pedFileSizeCommitLog
)
{
this
.
map
edFileSizeCommitLog
=
ma
pedFileSizeCommitLog
;
public
void
setMap
pedFileSizeCommitLog
(
int
map
pedFileSizeCommitLog
)
{
this
.
map
pedFileSizeCommitLog
=
map
pedFileSizeCommitLog
;
}
public
int
getMapedFileSizeConsumeQueue
()
{
public
int
getMap
p
edFileSizeConsumeQueue
()
{
int
factor
=
(
int
)
Math
.
ceil
(
this
.
mapedFileSizeConsumeQueue
/
(
ConsumeQueue
.
CQ_STORE_UNIT_SIZE
*
1.0
));
int
factor
=
(
int
)
Math
.
ceil
(
this
.
map
p
edFileSizeConsumeQueue
/
(
ConsumeQueue
.
CQ_STORE_UNIT_SIZE
*
1.0
));
return
(
int
)
(
factor
*
ConsumeQueue
.
CQ_STORE_UNIT_SIZE
);
}
public
void
setMap
edFileSizeConsumeQueue
(
int
ma
pedFileSizeConsumeQueue
)
{
this
.
map
edFileSizeConsumeQueue
=
ma
pedFileSizeConsumeQueue
;
public
void
setMap
pedFileSizeConsumeQueue
(
int
map
pedFileSizeConsumeQueue
)
{
this
.
map
pedFileSizeConsumeQueue
=
map
pedFileSizeConsumeQueue
;
}
public
boolean
isEnableConsumeQueueExt
()
{
...
...
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
浏览文件 @
6e7df557
...
...
@@ -80,7 +80,7 @@ public class DLedgerCommitLog extends CommitLog {
dLedgerConfig
.
setGroup
(
defaultMessageStore
.
getMessageStoreConfig
().
getdLegerGroup
());
dLedgerConfig
.
setPeers
(
defaultMessageStore
.
getMessageStoreConfig
().
getdLegerPeers
());
dLedgerConfig
.
setStoreBaseDir
(
defaultMessageStore
.
getMessageStoreConfig
().
getStorePathRootDir
());
dLedgerConfig
.
setMappedFileSizeForEntryData
(
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
());
dLedgerConfig
.
setMappedFileSizeForEntryData
(
defaultMessageStore
.
getMessageStoreConfig
().
getMap
p
edFileSizeCommitLog
());
dLedgerConfig
.
setDeleteWhen
(
defaultMessageStore
.
getMessageStoreConfig
().
getDeleteWhen
());
dLedgerConfig
.
setFileReservedHours
(
defaultMessageStore
.
getMessageStoreConfig
().
getFileReservedTime
()
+
1
);
id
=
Integer
.
valueOf
(
dLedgerConfig
.
getSelfId
().
substring
(
1
))
+
1
;
...
...
@@ -514,7 +514,7 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public
long
rollNextFile
(
final
long
offset
)
{
int
mappedFileSize
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
();
int
mappedFileSize
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getMap
p
edFileSizeCommitLog
();
return
offset
+
mappedFileSize
-
offset
%
mappedFileSize
;
}
...
...
store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
浏览文件 @
6e7df557
...
...
@@ -227,7 +227,7 @@ public class HAConnection {
masterOffset
=
masterOffset
-
(
masterOffset
%
HAConnection
.
this
.
haService
.
getDefaultMessageStore
().
getMessageStoreConfig
()
.
getMapedFileSizeCommitLog
());
.
getMap
p
edFileSizeCommitLog
());
if
(
masterOffset
<
0
)
{
masterOffset
=
0
;
...
...
store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
浏览文件 @
6e7df557
...
...
@@ -47,8 +47,8 @@ public class AppendCallbackTest {
@Before
public
void
init
()
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
8
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
4
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
1024
*
8
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
1024
*
4
);
messageStoreConfig
.
setMaxHashSlotNum
(
100
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
10
);
messageStoreConfig
.
setStorePathRootDir
(
System
.
getProperty
(
"user.home"
)
+
File
.
separator
+
"unitteststore"
);
...
...
store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
浏览文件 @
6e7df557
...
...
@@ -66,8 +66,8 @@ public class BatchPutMessageTest {
private
MessageStore
buildMessageStore
()
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
8
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
4
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
1024
*
8
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
1024
*
4
);
messageStoreConfig
.
setMaxHashSlotNum
(
100
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
10
);
messageStoreConfig
.
setFlushDiskType
(
FlushDiskType
.
SYNC_FLUSH
);
...
...
store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
浏览文件 @
6e7df557
...
...
@@ -86,8 +86,8 @@ public class ConsumeQueueTest {
public
MessageStoreConfig
buildStoreConfig
(
int
commitLogFileSize
,
int
cqFileSize
,
boolean
enableCqExt
,
int
cqExtFileSize
)
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
commitLogFileSize
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
cqFileSize
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
commitLogFileSize
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
cqFileSize
);
messageStoreConfig
.
setMappedFileSizeConsumeQueueExt
(
cqExtFileSize
);
messageStoreConfig
.
setMessageIndexEnable
(
false
);
messageStoreConfig
.
setEnableConsumeQueueExt
(
enableCqExt
);
...
...
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
浏览文件 @
6e7df557
...
...
@@ -281,7 +281,7 @@ public class DefaultMessageStoreCleanFilesTest {
}
private
int
getMsgCountPerConsumeQueueMappedFile
()
{
int
size
=
messageStore
.
getMessageStoreConfig
().
getMapedFileSizeConsumeQueue
();
int
size
=
messageStore
.
getMessageStoreConfig
().
getMap
p
edFileSizeConsumeQueue
();
return
size
/
CQ_STORE_UNIT_SIZE
;
// 7 in this case
}
...
...
@@ -322,8 +322,8 @@ public class DefaultMessageStoreCleanFilesTest {
private
void
initMessageStore
(
String
deleteWhen
,
int
diskMaxUsedSpaceRatio
,
double
diskSpaceCleanForciblyRatio
)
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfigForTest
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
mappedFileSize
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
mappedFileSize
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
mappedFileSize
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
mappedFileSize
);
messageStoreConfig
.
setMaxHashSlotNum
(
10000
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
100
);
messageStoreConfig
.
setFlushDiskType
(
FlushDiskType
.
SYNC_FLUSH
);
...
...
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
浏览文件 @
6e7df557
...
...
@@ -64,8 +64,8 @@ public class DefaultMessageStoreShutDownTest {
public
DefaultMessageStore
buildMessageStore
()
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMaxHashSlotNum
(
10000
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
100
);
messageStoreConfig
.
setFlushDiskType
(
FlushDiskType
.
SYNC_FLUSH
);
...
...
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
浏览文件 @
6e7df557
...
...
@@ -40,7 +40,6 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
import
org.apache.rocketmq.store.stats.BrokerStatsManager
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Ignore
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.junit.MockitoJUnitRunner
;
...
...
@@ -76,8 +75,8 @@ public class DefaultMessageStoreTest {
MessageBody
=
StoreMessage
.
getBytes
();
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
8
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
4
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
1024
*
8
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
1024
*
4
);
messageStoreConfig
.
setMaxHashSlotNum
(
100
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
10
);
MessageStore
master
=
new
DefaultMessageStore
(
messageStoreConfig
,
null
,
new
MyMessageArrivingListener
(),
new
BrokerConfig
());
...
...
@@ -106,8 +105,8 @@ public class DefaultMessageStoreTest {
private
MessageStore
buildMessageStore
()
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMaxHashSlotNum
(
10000
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
100
);
messageStoreConfig
.
setFlushDiskType
(
FlushDiskType
.
SYNC_FLUSH
);
...
...
store/src/test/java/org/apache/rocketmq/store/HATest.java
浏览文件 @
6e7df557
...
...
@@ -22,7 +22,6 @@ import org.apache.rocketmq.common.UtilAll;
import
org.apache.rocketmq.store.config.BrokerRole
;
import
org.apache.rocketmq.store.config.FlushDiskType
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.ha.HAService
;
import
org.apache.rocketmq.store.stats.BrokerStatsManager
;
import
org.junit.After
;
import
org.junit.Before
;
...
...
@@ -35,10 +34,7 @@ import java.lang.reflect.Method;
import
java.net.InetAddress
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.UUID
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
...
...
@@ -141,8 +137,8 @@ public class HATest {
}
private
void
buildMessageStoreConfig
(
MessageStoreConfig
messageStoreConfig
){
messageStoreConfig
.
setMapedFileSizeCommitLog
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
1024
*
1024
*
10
);
messageStoreConfig
.
setMaxHashSlotNum
(
10000
);
messageStoreConfig
.
setMaxIndexNum
(
100
*
100
);
messageStoreConfig
.
setFlushDiskType
(
FlushDiskType
.
SYNC_FLUSH
);
...
...
store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
浏览文件 @
6e7df557
...
...
@@ -43,8 +43,8 @@ public class MessageStoreTestBase extends StoreTestBase {
System
.
setProperty
(
"dledger.disk.ratio.clean"
,
"0.95"
);
baseDirs
.
add
(
base
);
MessageStoreConfig
storeConfig
=
new
MessageStoreConfig
();
storeConfig
.
setMapedFileSizeCommitLog
(
1024
*
100
);
storeConfig
.
setMapedFileSizeConsumeQueue
(
1024
);
storeConfig
.
setMap
p
edFileSizeCommitLog
(
1024
*
100
);
storeConfig
.
setMap
p
edFileSizeConsumeQueue
(
1024
);
storeConfig
.
setMaxHashSlotNum
(
100
);
storeConfig
.
setMaxIndexNum
(
100
*
10
);
storeConfig
.
setStorePathRootDir
(
base
);
...
...
@@ -98,8 +98,8 @@ public class MessageStoreTestBase extends StoreTestBase {
protected
DefaultMessageStore
createMessageStore
(
String
base
,
boolean
createAbort
)
throws
Exception
{
baseDirs
.
add
(
base
);
MessageStoreConfig
storeConfig
=
new
MessageStoreConfig
();
storeConfig
.
setMapedFileSizeCommitLog
(
1024
*
100
);
storeConfig
.
setMapedFileSizeConsumeQueue
(
1024
);
storeConfig
.
setMap
p
edFileSizeCommitLog
(
1024
*
100
);
storeConfig
.
setMap
p
edFileSizeConsumeQueue
(
1024
);
storeConfig
.
setMaxHashSlotNum
(
100
);
storeConfig
.
setMaxIndexNum
(
100
*
10
);
storeConfig
.
setStorePathRootDir
(
base
);
...
...
store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java
浏览文件 @
6e7df557
...
...
@@ -36,7 +36,7 @@ public class MixCommitlogTest extends MessageStoreTestBase {
{
DefaultMessageStore
originalStore
=
createMessageStore
(
base
,
false
);
doPutMessages
(
originalStore
,
topic
,
0
,
1000
,
0
);
Assert
.
assertEquals
(
11
,
originalStore
.
getMaxPhyOffset
()/
originalStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
());
Assert
.
assertEquals
(
11
,
originalStore
.
getMaxPhyOffset
()/
originalStore
.
getMessageStoreConfig
().
getMap
p
edFileSizeCommitLog
());
Thread
.
sleep
(
500
);
Assert
.
assertEquals
(
0
,
originalStore
.
getMinOffsetInQueue
(
topic
,
0
));
Assert
.
assertEquals
(
1000
,
originalStore
.
getMaxOffsetInQueue
(
topic
,
0
));
...
...
@@ -83,7 +83,7 @@ public class MixCommitlogTest extends MessageStoreTestBase {
Assert
.
assertEquals
(
1000
,
originalStore
.
getMaxOffsetInQueue
(
topic
,
0
));
Assert
.
assertEquals
(
0
,
originalStore
.
dispatchBehindBytes
());
dividedOffset
=
originalStore
.
getCommitLog
().
getMaxOffset
();
dividedOffset
=
dividedOffset
-
dividedOffset
%
originalStore
.
getMessageStoreConfig
().
getMap
edFileSizeCommitLog
()
+
originalStore
.
getMessageStoreConfig
().
getMa
pedFileSizeCommitLog
();
dividedOffset
=
dividedOffset
-
dividedOffset
%
originalStore
.
getMessageStoreConfig
().
getMap
pedFileSizeCommitLog
()
+
originalStore
.
getMessageStoreConfig
().
getMap
pedFileSizeCommitLog
();
doGetMessages
(
originalStore
,
topic
,
0
,
1000
,
0
);
originalStore
.
shutdown
();
}
...
...
@@ -144,7 +144,7 @@ public class MixCommitlogTest extends MessageStoreTestBase {
Assert
.
assertEquals
(
1000
,
originalStore
.
getMaxOffsetInQueue
(
topic
,
0
));
Assert
.
assertEquals
(
0
,
originalStore
.
dispatchBehindBytes
());
dividedOffset
=
originalStore
.
getCommitLog
().
getMaxOffset
();
dividedOffset
=
dividedOffset
-
dividedOffset
%
originalStore
.
getMessageStoreConfig
().
getMap
edFileSizeCommitLog
()
+
originalStore
.
getMessageStoreConfig
().
getMa
pedFileSizeCommitLog
();
dividedOffset
=
dividedOffset
-
dividedOffset
%
originalStore
.
getMessageStoreConfig
().
getMap
pedFileSizeCommitLog
()
+
originalStore
.
getMessageStoreConfig
().
getMap
pedFileSizeCommitLog
();
originalStore
.
shutdown
();
}
long
maxPhysicalOffset
;
...
...
store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
浏览文件 @
6e7df557
...
...
@@ -91,8 +91,8 @@ public class ScheduleMessageServiceTest {
public
void
init
()
throws
Exception
{
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMessageDelayLevel
(
testMessageDelayLevel
);
messageStoreConfig
.
setMapedFileSizeCommitLog
(
commitLogFileSize
);
messageStoreConfig
.
setMapedFileSizeConsumeQueue
(
cqFileSize
);
messageStoreConfig
.
setMap
p
edFileSizeCommitLog
(
commitLogFileSize
);
messageStoreConfig
.
setMap
p
edFileSizeConsumeQueue
(
cqFileSize
);
messageStoreConfig
.
setMappedFileSizeConsumeQueueExt
(
cqExtFileSize
);
messageStoreConfig
.
setMessageIndexEnable
(
false
);
messageStoreConfig
.
setEnableConsumeQueueExt
(
true
);
...
...
test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
浏览文件 @
6e7df557
...
...
@@ -135,7 +135,7 @@ public class IntegrationTestBase {
brokerConfig
.
setEnablePropertyFilter
(
true
);
storeConfig
.
setStorePathRootDir
(
baseDir
);
storeConfig
.
setStorePathCommitLog
(
baseDir
+
SEP
+
"commitlog"
);
storeConfig
.
setMapedFileSizeCommitLog
(
COMMIT_LOG_SIZE
);
storeConfig
.
setMap
p
edFileSizeCommitLog
(
COMMIT_LOG_SIZE
);
storeConfig
.
setMaxIndexNum
(
INDEX_NUM
);
storeConfig
.
setMaxHashSlotNum
(
INDEX_NUM
*
4
);
return
createAndStartBroker
(
storeConfig
,
brokerConfig
);
...
...
test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
浏览文件 @
6e7df557
...
...
@@ -35,7 +35,6 @@ import org.apache.rocketmq.test.base.IntegrationTestBase;
import
org.apache.rocketmq.test.factory.ConsumerFactory
;
import
org.apache.rocketmq.test.factory.ProducerFactory
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
import
static
org
.
apache
.
rocketmq
.
test
.
base
.
IntegrationTestBase
.
nextPort
;
...
...
@@ -58,7 +57,7 @@ public class DLedgerProduceAndConsumeIT {
storeConfig
.
setStorePathRootDir
(
baseDir
);
storeConfig
.
setStorePathCommitLog
(
baseDir
+
SEP
+
"commitlog"
);
storeConfig
.
setHaListenPort
(
nextPort
());
storeConfig
.
setMapedFileSizeCommitLog
(
10
*
1024
*
1024
);
storeConfig
.
setMap
p
edFileSizeCommitLog
(
10
*
1024
*
1024
);
storeConfig
.
setEnableDLegerCommitLog
(
true
);
storeConfig
.
setdLegerGroup
(
brokerName
);
storeConfig
.
setdLegerSelfId
(
selfId
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录