Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
e9814ad4
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
e9814ad4
编写于
5月 07, 2017
作者:
Z
Zhanhui Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add javadoc to message store.
上级
6898d96c
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
259 addition
and
46 deletion
+259
-46
broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
.../org/apache/rocketmq/broker/client/net/Broker2Client.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
...e/rocketmq/broker/longpolling/PullRequestHoldService.java
+2
-2
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
.../apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+2
-2
broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
...he/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+10
-10
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
...pache/rocketmq/broker/processor/AdminBrokerProcessor.java
+8
-8
broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
...he/rocketmq/broker/processor/ConsumerManageProcessor.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+8
-8
store/src/main/java/org/apache/rocketmq/store/MessageStore.java
...src/main/java/org/apache/rocketmq/store/MessageStore.java
+226
-13
store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
...pache/rocketmq/store/schedule/ScheduleMessageService.java
+1
-1
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
浏览文件 @
e9814ad4
...
@@ -149,7 +149,7 @@ public class Broker2Client {
...
@@ -149,7 +149,7 @@ public class Broker2Client {
long
timeStampOffset
;
long
timeStampOffset
;
if
(
timeStamp
==
-
1
)
{
if
(
timeStamp
==
-
1
)
{
timeStampOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
topic
,
i
);
timeStampOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
topic
,
i
);
}
else
{
}
else
{
timeStampOffset
=
this
.
brokerController
.
getMessageStore
().
getOffsetInQueueByTime
(
topic
,
i
,
timeStamp
);
timeStampOffset
=
this
.
brokerController
.
getMessageStore
().
getOffsetInQueueByTime
(
topic
,
i
,
timeStamp
);
}
}
...
...
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
浏览文件 @
e9814ad4
...
@@ -98,7 +98,7 @@ public class PullRequestHoldService extends ServiceThread {
...
@@ -98,7 +98,7 @@ public class PullRequestHoldService extends ServiceThread {
if
(
2
==
kArray
.
length
)
{
if
(
2
==
kArray
.
length
)
{
String
topic
=
kArray
[
0
];
String
topic
=
kArray
[
0
];
int
queueId
=
Integer
.
parseInt
(
kArray
[
1
]);
int
queueId
=
Integer
.
parseInt
(
kArray
[
1
]);
final
long
offset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
topic
,
queueId
);
final
long
offset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
topic
,
queueId
);
try
{
try
{
this
.
notifyMessageArriving
(
topic
,
queueId
,
offset
);
this
.
notifyMessageArriving
(
topic
,
queueId
,
offset
);
}
catch
(
Throwable
e
)
{
}
catch
(
Throwable
e
)
{
...
@@ -124,7 +124,7 @@ public class PullRequestHoldService extends ServiceThread {
...
@@ -124,7 +124,7 @@ public class PullRequestHoldService extends ServiceThread {
for
(
PullRequest
request
:
requestList
)
{
for
(
PullRequest
request
:
requestList
)
{
long
newestOffset
=
maxOffset
;
long
newestOffset
=
maxOffset
;
if
(
newestOffset
<=
request
.
getPullFromThisOffset
())
{
if
(
newestOffset
<=
request
.
getPullFromThisOffset
())
{
newestOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
topic
,
queueId
);
newestOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
topic
,
queueId
);
}
}
if
(
newestOffset
>
request
.
getPullFromThisOffset
())
{
if
(
newestOffset
>
request
.
getPullFromThisOffset
())
{
...
...
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
浏览文件 @
e9814ad4
...
@@ -73,7 +73,7 @@ public class ConsumerOffsetManager extends ConfigManager {
...
@@ -73,7 +73,7 @@ public class ConsumerOffsetManager extends ConfigManager {
while
(
it
.
hasNext
()
&&
result
)
{
while
(
it
.
hasNext
()
&&
result
)
{
Entry
<
Integer
,
Long
>
next
=
it
.
next
();
Entry
<
Integer
,
Long
>
next
=
it
.
next
();
long
minOffsetInStore
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
q
ue
(
topic
,
next
.
getKey
());
long
minOffsetInStore
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
e
ue
(
topic
,
next
.
getKey
());
long
offsetInPersist
=
next
.
getValue
();
long
offsetInPersist
=
next
.
getValue
();
result
=
offsetInPersist
<=
minOffsetInStore
;
result
=
offsetInPersist
<=
minOffsetInStore
;
}
}
...
@@ -201,7 +201,7 @@ public class ConsumerOffsetManager extends ConfigManager {
...
@@ -201,7 +201,7 @@ public class ConsumerOffsetManager extends ConfigManager {
String
[]
topicGroupArr
=
topicGroup
.
split
(
TOPIC_GROUP_SEPARATOR
);
String
[]
topicGroupArr
=
topicGroup
.
split
(
TOPIC_GROUP_SEPARATOR
);
if
(
topic
.
equals
(
topicGroupArr
[
0
]))
{
if
(
topic
.
equals
(
topicGroupArr
[
0
]))
{
for
(
Entry
<
Integer
,
Long
>
entry
:
offSetEntry
.
getValue
().
entrySet
())
{
for
(
Entry
<
Integer
,
Long
>
entry
:
offSetEntry
.
getValue
().
entrySet
())
{
long
minOffset
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
q
ue
(
topic
,
entry
.
getKey
());
long
minOffset
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
e
ue
(
topic
,
entry
.
getKey
());
if
(
entry
.
getValue
()
>=
minOffset
)
{
if
(
entry
.
getValue
()
>=
minOffset
)
{
Long
offset
=
queueMinOffset
.
get
(
entry
.
getKey
());
Long
offset
=
queueMinOffset
.
get
(
entry
.
getKey
());
if
(
offset
==
null
)
{
if
(
offset
==
null
)
{
...
...
broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
浏览文件 @
e9814ad4
...
@@ -92,18 +92,18 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
...
@@ -92,18 +92,18 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
}
@Override
@Override
public
long
getMaxOffsetInQu
q
ue
(
String
topic
,
int
queueId
)
{
public
long
getMaxOffsetInQu
e
ue
(
String
topic
,
int
queueId
)
{
return
next
.
getMaxOffsetInQu
q
ue
(
topic
,
queueId
);
return
next
.
getMaxOffsetInQu
e
ue
(
topic
,
queueId
);
}
}
@Override
@Override
public
long
getMinOffsetInQu
q
ue
(
String
topic
,
int
queueId
)
{
public
long
getMinOffsetInQu
e
ue
(
String
topic
,
int
queueId
)
{
return
next
.
getMinOffsetInQu
q
ue
(
topic
,
queueId
);
return
next
.
getMinOffsetInQu
e
ue
(
topic
,
queueId
);
}
}
@Override
@Override
public
long
getCommitLogOffsetInQueue
(
String
topic
,
int
queueId
,
long
c
q
Offset
)
{
public
long
getCommitLogOffsetInQueue
(
String
topic
,
int
queueId
,
long
c
onsumeQueue
Offset
)
{
return
next
.
getCommitLogOffsetInQueue
(
topic
,
queueId
,
c
q
Offset
);
return
next
.
getCommitLogOffsetInQueue
(
topic
,
queueId
,
c
onsumeQueue
Offset
);
}
}
@Override
@Override
...
@@ -152,8 +152,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
...
@@ -152,8 +152,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
}
@Override
@Override
public
long
getMessageStoreTimeStamp
(
String
topic
,
int
queueId
,
long
o
ffset
)
{
public
long
getMessageStoreTimeStamp
(
String
topic
,
int
queueId
,
long
consumeQueueO
ffset
)
{
return
next
.
getMessageStoreTimeStamp
(
topic
,
queueId
,
o
ffset
);
return
next
.
getMessageStoreTimeStamp
(
topic
,
queueId
,
consumeQueueO
ffset
);
}
}
@Override
@Override
...
@@ -172,8 +172,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
...
@@ -172,8 +172,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
}
@Override
@Override
public
void
ex
cuteDeleteFilesManua
ly
()
{
public
void
ex
ecuteDeleteFilesManual
ly
()
{
next
.
ex
cuteDeleteFilesManua
ly
();
next
.
ex
ecuteDeleteFilesManual
ly
();
}
}
@Override
@Override
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
浏览文件 @
e9814ad4
...
@@ -376,7 +376,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
...
@@ -376,7 +376,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final
GetMaxOffsetRequestHeader
requestHeader
=
final
GetMaxOffsetRequestHeader
requestHeader
=
(
GetMaxOffsetRequestHeader
)
request
.
decodeCommandCustomHeader
(
GetMaxOffsetRequestHeader
.
class
);
(
GetMaxOffsetRequestHeader
)
request
.
decodeCommandCustomHeader
(
GetMaxOffsetRequestHeader
.
class
);
long
offset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
long
offset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
responseHeader
.
setOffset
(
offset
);
responseHeader
.
setOffset
(
offset
);
...
@@ -391,7 +391,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
...
@@ -391,7 +391,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final
GetMinOffsetRequestHeader
requestHeader
=
final
GetMinOffsetRequestHeader
requestHeader
=
(
GetMinOffsetRequestHeader
)
request
.
decodeCommandCustomHeader
(
GetMinOffsetRequestHeader
.
class
);
(
GetMinOffsetRequestHeader
)
request
.
decodeCommandCustomHeader
(
GetMinOffsetRequestHeader
.
class
);
long
offset
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
q
ue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
long
offset
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
e
ue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
responseHeader
.
setOffset
(
offset
);
responseHeader
.
setOffset
(
offset
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
...
@@ -537,11 +537,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
...
@@ -537,11 +537,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
mq
.
setQueueId
(
i
);
mq
.
setQueueId
(
i
);
TopicOffset
topicOffset
=
new
TopicOffset
();
TopicOffset
topicOffset
=
new
TopicOffset
();
long
min
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
q
ue
(
topic
,
i
);
long
min
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
e
ue
(
topic
,
i
);
if
(
min
<
0
)
if
(
min
<
0
)
min
=
0
;
min
=
0
;
long
max
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
topic
,
i
);
long
max
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
topic
,
i
);
if
(
max
<
0
)
if
(
max
<
0
)
max
=
0
;
max
=
0
;
...
@@ -679,7 +679,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
...
@@ -679,7 +679,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
OffsetWrapper
offsetWrapper
=
new
OffsetWrapper
();
OffsetWrapper
offsetWrapper
=
new
OffsetWrapper
();
long
brokerOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
topic
,
i
);
long
brokerOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
topic
,
i
);
if
(
brokerOffset
<
0
)
if
(
brokerOffset
<
0
)
brokerOffset
=
0
;
brokerOffset
=
0
;
...
@@ -862,7 +862,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
...
@@ -862,7 +862,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
long
minTime
=
this
.
brokerController
.
getMessageStore
().
getEarliestMessageTime
(
topic
,
i
);
long
minTime
=
this
.
brokerController
.
getMessageStore
().
getEarliestMessageTime
(
topic
,
i
);
timeSpan
.
setMinTimeStamp
(
minTime
);
timeSpan
.
setMinTimeStamp
(
minTime
);
long
max
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
topic
,
i
);
long
max
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
topic
,
i
);
long
maxTime
=
this
.
brokerController
.
getMessageStore
().
getMessageStoreTimeStamp
(
topic
,
i
,
max
-
1
);
long
maxTime
=
this
.
brokerController
.
getMessageStore
().
getMessageStoreTimeStamp
(
topic
,
i
,
max
-
1
);
timeSpan
.
setMaxTimeStamp
(
maxTime
);
timeSpan
.
setMaxTimeStamp
(
maxTime
);
...
@@ -876,7 +876,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
...
@@ -876,7 +876,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
}
timeSpan
.
setConsumeTimeStamp
(
consumeTime
);
timeSpan
.
setConsumeTimeStamp
(
consumeTime
);
long
maxBrokerOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
requestHeader
.
getTopic
(),
i
);
long
maxBrokerOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
requestHeader
.
getTopic
(),
i
);
if
(
consumerOffset
<
maxBrokerOffset
)
{
if
(
consumerOffset
<
maxBrokerOffset
)
{
long
nextTime
=
this
.
brokerController
.
getMessageStore
().
getMessageStoreTimeStamp
(
topic
,
i
,
consumerOffset
);
long
nextTime
=
this
.
brokerController
.
getMessageStore
().
getMessageStoreTimeStamp
(
topic
,
i
,
consumerOffset
);
timeSpan
.
setDelayTime
(
System
.
currentTimeMillis
()
-
nextTime
);
timeSpan
.
setDelayTime
(
System
.
currentTimeMillis
()
-
nextTime
);
...
@@ -1126,7 +1126,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
...
@@ -1126,7 +1126,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
mq
.
setBrokerName
(
this
.
brokerController
.
getBrokerConfig
().
getBrokerName
());
mq
.
setBrokerName
(
this
.
brokerController
.
getBrokerConfig
().
getBrokerName
());
mq
.
setQueueId
(
i
);
mq
.
setQueueId
(
i
);
OffsetWrapper
offsetWrapper
=
new
OffsetWrapper
();
OffsetWrapper
offsetWrapper
=
new
OffsetWrapper
();
long
brokerOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
q
ue
(
topic
,
i
);
long
brokerOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQu
e
ue
(
topic
,
i
);
if
(
brokerOffset
<
0
)
if
(
brokerOffset
<
0
)
brokerOffset
=
0
;
brokerOffset
=
0
;
long
consumerOffset
=
this
.
brokerController
.
getConsumerOffsetManager
().
queryOffset
(
//
long
consumerOffset
=
this
.
brokerController
.
getConsumerOffsetManager
().
queryOffset
(
//
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
浏览文件 @
e9814ad4
...
@@ -135,7 +135,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
...
@@ -135,7 +135,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
response
.
setRemark
(
null
);
response
.
setRemark
(
null
);
}
else
{
}
else
{
long
minOffset
=
long
minOffset
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
q
ue
(
requestHeader
.
getTopic
(),
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQu
e
ue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
requestHeader
.
getQueueId
());
if
(
minOffset
<=
0
if
(
minOffset
<=
0
&&
!
this
.
brokerController
.
getMessageStore
().
checkInDiskByConsumeOffset
(
&&
!
this
.
brokerController
.
getMessageStore
().
checkInDiskByConsumeOffset
(
...
...
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
e9814ad4
...
@@ -580,7 +580,7 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -580,7 +580,7 @@ public class DefaultMessageStore implements MessageStore {
/**
/**
*/
*/
public
long
getMaxOffsetInQu
q
ue
(
String
topic
,
int
queueId
)
{
public
long
getMaxOffsetInQu
e
ue
(
String
topic
,
int
queueId
)
{
ConsumeQueue
logic
=
this
.
findConsumeQueue
(
topic
,
queueId
);
ConsumeQueue
logic
=
this
.
findConsumeQueue
(
topic
,
queueId
);
if
(
logic
!=
null
)
{
if
(
logic
!=
null
)
{
long
offset
=
logic
.
getMaxOffsetInQueue
();
long
offset
=
logic
.
getMaxOffsetInQueue
();
...
@@ -593,7 +593,7 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -593,7 +593,7 @@ public class DefaultMessageStore implements MessageStore {
/**
/**
*/
*/
public
long
getMinOffsetInQu
q
ue
(
String
topic
,
int
queueId
)
{
public
long
getMinOffsetInQu
e
ue
(
String
topic
,
int
queueId
)
{
ConsumeQueue
logic
=
this
.
findConsumeQueue
(
topic
,
queueId
);
ConsumeQueue
logic
=
this
.
findConsumeQueue
(
topic
,
queueId
);
if
(
logic
!=
null
)
{
if
(
logic
!=
null
)
{
return
logic
.
getMinOffsetInQueue
();
return
logic
.
getMinOffsetInQueue
();
...
@@ -603,10 +603,10 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -603,10 +603,10 @@ public class DefaultMessageStore implements MessageStore {
}
}
@Override
@Override
public
long
getCommitLogOffsetInQueue
(
String
topic
,
int
queueId
,
long
c
q
Offset
)
{
public
long
getCommitLogOffsetInQueue
(
String
topic
,
int
queueId
,
long
c
onsumeQueue
Offset
)
{
ConsumeQueue
consumeQueue
=
findConsumeQueue
(
topic
,
queueId
);
ConsumeQueue
consumeQueue
=
findConsumeQueue
(
topic
,
queueId
);
if
(
consumeQueue
!=
null
)
{
if
(
consumeQueue
!=
null
)
{
SelectMappedBufferResult
bufferConsumeQueue
=
consumeQueue
.
getIndexBuffer
(
c
q
Offset
);
SelectMappedBufferResult
bufferConsumeQueue
=
consumeQueue
.
getIndexBuffer
(
c
onsumeQueue
Offset
);
if
(
bufferConsumeQueue
!=
null
)
{
if
(
bufferConsumeQueue
!=
null
)
{
try
{
try
{
long
offsetPy
=
bufferConsumeQueue
.
getByteBuffer
().
getLong
();
long
offsetPy
=
bufferConsumeQueue
.
getByteBuffer
().
getLong
();
...
@@ -740,10 +740,10 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -740,10 +740,10 @@ public class DefaultMessageStore implements MessageStore {
}
}
@Override
@Override
public
long
getMessageStoreTimeStamp
(
String
topic
,
int
queueId
,
long
o
ffset
)
{
public
long
getMessageStoreTimeStamp
(
String
topic
,
int
queueId
,
long
consumeQueueO
ffset
)
{
ConsumeQueue
logicQueue
=
this
.
findConsumeQueue
(
topic
,
queueId
);
ConsumeQueue
logicQueue
=
this
.
findConsumeQueue
(
topic
,
queueId
);
if
(
logicQueue
!=
null
)
{
if
(
logicQueue
!=
null
)
{
SelectMappedBufferResult
result
=
logicQueue
.
getIndexBuffer
(
o
ffset
);
SelectMappedBufferResult
result
=
logicQueue
.
getIndexBuffer
(
consumeQueueO
ffset
);
if
(
result
!=
null
)
{
if
(
result
!=
null
)
{
try
{
try
{
final
long
phyOffset
=
result
.
getByteBuffer
().
getLong
();
final
long
phyOffset
=
result
.
getByteBuffer
().
getLong
();
...
@@ -798,7 +798,7 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -798,7 +798,7 @@ public class DefaultMessageStore implements MessageStore {
}
}
@Override
@Override
public
void
ex
cuteDeleteFilesManua
ly
()
{
public
void
ex
ecuteDeleteFilesManual
ly
()
{
this
.
cleanCommitLogService
.
excuteDeleteFilesManualy
();
this
.
cleanCommitLogService
.
excuteDeleteFilesManualy
();
}
}
...
@@ -1434,7 +1434,7 @@ public class DefaultMessageStore implements MessageStore {
...
@@ -1434,7 +1434,7 @@ public class DefaultMessageStore implements MessageStore {
public
void
excuteDeleteFilesManualy
()
{
public
void
excuteDeleteFilesManualy
()
{
this
.
manualDeleteFileSeveralTimes
=
MAX_MANUAL_DELETE_FILE_TIMES
;
this
.
manualDeleteFileSeveralTimes
=
MAX_MANUAL_DELETE_FILE_TIMES
;
DefaultMessageStore
.
log
.
info
(
"ex
cuteDeleteFilesManua
ly was invoked"
);
DefaultMessageStore
.
log
.
info
(
"ex
ecuteDeleteFilesManual
ly was invoked"
);
}
}
public
void
run
()
{
public
void
run
()
{
...
...
store/src/main/java/org/apache/rocketmq/store/MessageStore.java
浏览文件 @
e9814ad4
...
@@ -22,91 +22,304 @@ import java.util.Set;
...
@@ -22,91 +22,304 @@ import java.util.Set;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
import
org.apache.rocketmq.common.message.MessageExtBatch
;
/**
* This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
*/
public
interface
MessageStore
{
public
interface
MessageStore
{
/**
* Load previously stored messages.
* @return true if success; false otherwise.
*/
boolean
load
();
boolean
load
();
/**
* Launch this message store.
* @throws Exception if there is any error.
*/
void
start
()
throws
Exception
;
void
start
()
throws
Exception
;
/**
* Shutdown this message store.
*/
void
shutdown
();
void
shutdown
();
/**
* Destroy this message store. Generally, all persistent files should be removed after invocation.
*/
void
destroy
();
void
destroy
();
/**
* Store a message into store.
* @param msg Message instance to store
* @return result of store operation.
*/
PutMessageResult
putMessage
(
final
MessageExtBrokerInner
msg
);
PutMessageResult
putMessage
(
final
MessageExtBrokerInner
msg
);
/**
* Store a batch of messages.
* @param messageExtBatch Message batch.
* @return result of storing batch messages.
*/
PutMessageResult
putMessages
(
final
MessageExtBatch
messageExtBatch
);
PutMessageResult
putMessages
(
final
MessageExtBatch
messageExtBatch
);
/**
* Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
* from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
*
* @param group Consumer group that launches this query.
* @param topic Topic to query.
* @param queueId Queue ID to query.
* @param offset Logical offset to start from.
* @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
GetMessageResult
getMessage
(
final
String
group
,
final
String
topic
,
final
int
queueId
,
GetMessageResult
getMessage
(
final
String
group
,
final
String
topic
,
final
int
queueId
,
final
long
offset
,
final
int
maxMsgNums
,
final
MessageFilter
messageFilter
);
final
long
offset
,
final
int
maxMsgNums
,
final
MessageFilter
messageFilter
);
long
getMaxOffsetInQuque
(
final
String
topic
,
final
int
queueId
);
/**
* Get maximum offset of the topic queue.
long
getMinOffsetInQuque
(
final
String
topic
,
final
int
queueId
);
* @param topic Topic name.
* @param queueId Queue ID.
long
getCommitLogOffsetInQueue
(
final
String
topic
,
final
int
queueId
,
final
long
cqOffset
);
* @return Maximum offset at present.
*/
long
getMaxOffsetInQueue
(
final
String
topic
,
final
int
queueId
);
/**
* Get the minimum offset of the topic queue.
* @param topic Topic name.
* @param queueId Queue ID.
* @return Minimum offset at present.
*/
long
getMinOffsetInQueue
(
final
String
topic
,
final
int
queueId
);
/**
* Get the offset of the message in the commit log, which is also known as physical offset.
* @param topic Topic of the message to lookup.
* @param queueId Queue ID.
* @param consumeQueueOffset offset of consume queue.
* @return physical offset.
*/
long
getCommitLogOffsetInQueue
(
final
String
topic
,
final
int
queueId
,
final
long
consumeQueueOffset
);
/**
* Look up the physical offset of the message whose store timestamp is as specified.
* @param topic Topic of the message.
* @param queueId Queue ID.
* @param timestamp Timestamp to look up.
* @return physical offset which matches.
*/
long
getOffsetInQueueByTime
(
final
String
topic
,
final
int
queueId
,
final
long
timestamp
);
long
getOffsetInQueueByTime
(
final
String
topic
,
final
int
queueId
,
final
long
timestamp
);
/**
* Look up the message by given commit log offset.
* @param commitLogOffset physical offset.
* @return Message whose physical offset is as specified.
*/
MessageExt
lookMessageByOffset
(
final
long
commitLogOffset
);
MessageExt
lookMessageByOffset
(
final
long
commitLogOffset
);
/**
* Get one message from the specified commit log offset.
* @param commitLogOffset commit log offset.
* @return wrapped result of the message.
*/
SelectMappedBufferResult
selectOneMessageByOffset
(
final
long
commitLogOffset
);
SelectMappedBufferResult
selectOneMessageByOffset
(
final
long
commitLogOffset
);
/**
* Get one message from the specified commit log offset.
* @param commitLogOffset commit log offset.
* @param msgSize message size.
* @return wrapped result of the message.
*/
SelectMappedBufferResult
selectOneMessageByOffset
(
final
long
commitLogOffset
,
final
int
msgSize
);
SelectMappedBufferResult
selectOneMessageByOffset
(
final
long
commitLogOffset
,
final
int
msgSize
);
/**
* Get the running information of this store.
* @return message store running info.
*/
String
getRunningDataInfo
();
String
getRunningDataInfo
();
/**
* Message store runtime information, which should generally contains various statistical information.
* @return runtime information of the message store in format of key-value pairs.
*/
HashMap
<
String
,
String
>
getRuntimeInfo
();
HashMap
<
String
,
String
>
getRuntimeInfo
();
/**
* Get the maximum commit log offset.
* @return maximum commit log offset.
*/
long
getMaxPhyOffset
();
long
getMaxPhyOffset
();
/**
* Get the minimum commit log offset.
* @return minimum commit log offset.
*/
long
getMinPhyOffset
();
long
getMinPhyOffset
();
/**
* Get the store time of the earliest message in the given queue.
* @param topic Topic of the messages to query.
* @param queueId Queue ID to find.
* @return store time of the earliest message.
*/
long
getEarliestMessageTime
(
final
String
topic
,
final
int
queueId
);
long
getEarliestMessageTime
(
final
String
topic
,
final
int
queueId
);
/**
* Get the store time of the earliest message in this store.
* @return timestamp of the earliest message in this store.
*/
long
getEarliestMessageTime
();
long
getEarliestMessageTime
();
long
getMessageStoreTimeStamp
(
final
String
topic
,
final
int
queueId
,
final
long
offset
);
/**
* Get the store time of the message specified.
* @param topic message topic.
* @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
* @return store timestamp of the message.
*/
long
getMessageStoreTimeStamp
(
final
String
topic
,
final
int
queueId
,
final
long
consumeQueueOffset
);
/**
* Get the total number of the messages in the specified queue.
* @param topic Topic
* @param queueId Queue ID.
* @return total number.
*/
long
getMessageTotalInQueue
(
final
String
topic
,
final
int
queueId
);
long
getMessageTotalInQueue
(
final
String
topic
,
final
int
queueId
);
/**
* Get the raw commit log data starting from the given offset, which should used for replication purpose.
* @param offset starting offset.
* @return commit log data.
*/
SelectMappedBufferResult
getCommitLogData
(
final
long
offset
);
SelectMappedBufferResult
getCommitLogData
(
final
long
offset
);
/**
* Append data to commit log.
* @param startOffset starting offset.
* @param data data to append.
* @return true if success; false otherwise.
*/
boolean
appendToCommitLog
(
final
long
startOffset
,
final
byte
[]
data
);
boolean
appendToCommitLog
(
final
long
startOffset
,
final
byte
[]
data
);
void
excuteDeleteFilesManualy
();
/**
* Execute file deletion manually.
QueryMessageResult
queryMessage
(
final
String
topic
,
final
String
key
,
final
int
maxNum
,
*/
final
long
begin
,
final
long
end
);
void
executeDeleteFilesManually
();
/**
* Query messages by given key.
* @param topic topic of the message.
* @param key message key.
* @param maxNum maximum number of the messages possible.
* @param begin begin timestamp.
* @param end end timestamp.
* @return
*/
QueryMessageResult
queryMessage
(
final
String
topic
,
final
String
key
,
final
int
maxNum
,
final
long
begin
,
final
long
end
);
/**
* Update HA master address.
* @param newAddr new address.
*/
void
updateHaMasterAddress
(
final
String
newAddr
);
void
updateHaMasterAddress
(
final
String
newAddr
);
/**
* Return how much the slave falls behind.
* @return number of bytes that slave falls behind.
*/
long
slaveFallBehindMuch
();
long
slaveFallBehindMuch
();
/**
* Return the current timestamp of the store.
* @return current time in milliseconds since 1970-01-01.
*/
long
now
();
long
now
();
/**
* Clean unused topics.
* @param topics all valid topics.
* @return number of the topics deleted.
*/
int
cleanUnusedTopic
(
final
Set
<
String
>
topics
);
int
cleanUnusedTopic
(
final
Set
<
String
>
topics
);
/**
* Clean expired consume queues.
*/
void
cleanExpiredConsumerQueue
();
void
cleanExpiredConsumerQueue
();
/**
* Check if the given message has been swapped out of the memory.
* @param topic topic.
* @param queueId queue ID.
* @param consumeOffset consume queue offset.
* @return true if the message is no longer in memory; false otherwise.
*/
boolean
checkInDiskByConsumeOffset
(
final
String
topic
,
final
int
queueId
,
long
consumeOffset
);
boolean
checkInDiskByConsumeOffset
(
final
String
topic
,
final
int
queueId
,
long
consumeOffset
);
/**
* Get number of the bytes that have been stored in commit log and not yet dispatched to consume queue.
* @return number of the bytes to dispatch.
*/
long
dispatchBehindBytes
();
long
dispatchBehindBytes
();
/**
* Flush the message store to persist all data.
* @return maximum offset flushed to persistent storage device.
*/
long
flush
();
long
flush
();
/**
* Reset written offset.
* @param phyOffset new offset.
* @return true if success; false otherwise.
*/
boolean
resetWriteOffset
(
long
phyOffset
);
boolean
resetWriteOffset
(
long
phyOffset
);
/**
* Get confirm offset.
* @return confirm offset.
*/
long
getConfirmOffset
();
long
getConfirmOffset
();
/**
* Set confirm offset.
* @param phyOffset confirm offset to set.
*/
void
setConfirmOffset
(
long
phyOffset
);
void
setConfirmOffset
(
long
phyOffset
);
/**
* Check if the operation system page cache is busy or not.
* @return true if the OS page cache is busy; false otherwise.
*/
boolean
isOSPageCacheBusy
();
boolean
isOSPageCacheBusy
();
/**
* Get lock time in milliseconds of the store by far.
* @return lock time in milliseconds.
*/
long
lockTimeMills
();
long
lockTimeMills
();
/**
* Check if the transient store pool is deficient.
* @return true if the transient store pool is running out; false otherwise.
*/
boolean
isTransientStorePoolDeficient
();
boolean
isTransientStorePoolDeficient
();
/**
* Get the dispatcher list.
* @return list of the dispatcher.
*/
LinkedList
<
CommitLogDispatcher
>
getDispatcherList
();
LinkedList
<
CommitLogDispatcher
>
getDispatcherList
();
/**
* Get consume queue of the topic/queue.
* @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
*/
ConsumeQueue
getConsumeQueue
(
String
topic
,
int
queueId
);
ConsumeQueue
getConsumeQueue
(
String
topic
,
int
queueId
);
}
}
store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
浏览文件 @
e9814ad4
...
@@ -79,7 +79,7 @@ public class ScheduleMessageService extends ConfigManager {
...
@@ -79,7 +79,7 @@ public class ScheduleMessageService extends ConfigManager {
Entry
<
Integer
,
Long
>
next
=
it
.
next
();
Entry
<
Integer
,
Long
>
next
=
it
.
next
();
int
queueId
=
delayLevel2QueueId
(
next
.
getKey
());
int
queueId
=
delayLevel2QueueId
(
next
.
getKey
());
long
delayOffset
=
next
.
getValue
();
long
delayOffset
=
next
.
getValue
();
long
maxOffset
=
this
.
defaultMessageStore
.
getMaxOffsetInQu
q
ue
(
SCHEDULE_TOPIC
,
queueId
);
long
maxOffset
=
this
.
defaultMessageStore
.
getMaxOffsetInQu
e
ue
(
SCHEDULE_TOPIC
,
queueId
);
String
value
=
String
.
format
(
"%d,%d"
,
delayOffset
,
maxOffset
);
String
value
=
String
.
format
(
"%d,%d"
,
delayOffset
,
maxOffset
);
String
key
=
String
.
format
(
"%s_%d"
,
RunningStats
.
scheduleMessageOffset
.
name
(),
next
.
getKey
());
String
key
=
String
.
format
(
"%s_%d"
,
RunningStats
.
scheduleMessageOffset
.
name
(),
next
.
getKey
());
stats
.
put
(
key
,
value
);
stats
.
put
(
key
,
value
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录