Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
9edeb4ec
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
9edeb4ec
编写于
8月 22, 2019
作者:
H
Heng Du
提交者:
GitHub
8月 22, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1386 from apache/litePullConsumer
[ISSUE #1388]Add lite pull consumer support for RocketMQ
上级
90ae1167
67751d81
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
2668 addition
and
38 deletion
+2668
-38
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
...rc/main/java/org/apache/rocketmq/client/ClientConfig.java
+16
-2
client/src/main/java/org/apache/rocketmq/client/Validators.java
.../src/main/java/org/apache/rocketmq/client/Validators.java
+6
-3
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
...che/rocketmq/client/consumer/DefaultLitePullConsumer.java
+414
-0
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
...pache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+5
-1
client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
...org/apache/rocketmq/client/consumer/LitePullConsumer.java
+175
-0
client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
...a/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+1
-0
client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
...cketmq/client/consumer/MQPullConsumerScheduleService.java
+4
-2
client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java
...etmq/client/consumer/TopicMessageQueueChangeListener.java
+30
-0
client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
...cketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+19
-23
client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
...e/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+241
-0
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
...tmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+1074
-0
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
...ketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+6
-1
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
...rg/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+2
-0
client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
...g/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+3
-1
client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
.../rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+82
-0
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
...apache/rocketmq/client/impl/factory/MQClientInstance.java
+0
-4
client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
...rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+496
-0
example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
...a/org/apache/rocketmq/example/broadcast/PushConsumer.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java
...pache/rocketmq/example/simple/LitePullConsumerAssign.java
+53
-0
example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
...he/rocketmq/example/simple/LitePullConsumerSubscribe.java
+40
-0
未找到文件。
client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
浏览文件 @
9edeb4ec
...
...
@@ -16,7 +16,9 @@
*/
package
org.apache.rocketmq.client
;
import
java.util.Collection
;
import
java.util.HashSet
;
import
java.util.Iterator
;
import
java.util.Set
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.rocketmq.common.UtilAll
;
...
...
@@ -95,7 +97,6 @@ public class ClientConfig {
}
}
public
String
withNamespace
(
String
resource
)
{
return
NamespaceUtil
.
wrapNamespace
(
this
.
getNamespace
(),
resource
);
}
...
...
@@ -124,9 +125,21 @@ public class ClientConfig {
if
(
StringUtils
.
isEmpty
(
this
.
getNamespace
()))
{
return
queue
;
}
return
new
MessageQueue
(
withNamespace
(
queue
.
getTopic
()),
queue
.
getBrokerName
(),
queue
.
getQueueId
());
}
public
Collection
<
MessageQueue
>
queuesWithNamespace
(
Collection
<
MessageQueue
>
queues
)
{
if
(
StringUtils
.
isEmpty
(
this
.
getNamespace
()))
{
return
queues
;
}
Iterator
<
MessageQueue
>
iter
=
queues
.
iterator
();
while
(
iter
.
hasNext
())
{
MessageQueue
queue
=
iter
.
next
();
queue
.
setTopic
(
withNamespace
(
queue
.
getTopic
()));
}
return
queues
;
}
public
void
resetClientConfig
(
final
ClientConfig
cc
)
{
this
.
namesrvAddr
=
cc
.
namesrvAddr
;
this
.
clientIP
=
cc
.
clientIP
;
...
...
@@ -170,6 +183,7 @@ public class ClientConfig {
/**
* Domain name mode access way does not support the delimiter(;), and only one domain name can be set.
*
* @param namesrvAddr name server address
*/
public
void
setNamesrvAddr
(
String
namesrvAddr
)
{
...
...
client/src/main/java/org/apache/rocketmq/client/Validators.java
浏览文件 @
9edeb4ec
...
...
@@ -53,14 +53,17 @@ public class Validators {
if
(
UtilAll
.
isBlank
(
group
))
{
throw
new
MQClientException
(
"the specified group is blank"
,
null
);
}
if
(
group
.
length
()
>
CHARACTER_MAX_LENGTH
)
{
throw
new
MQClientException
(
"the specified group is longer than group max length 255."
,
null
);
}
if
(!
regularExpressionMatcher
(
group
,
PATTERN
))
{
throw
new
MQClientException
(
String
.
format
(
"the specified group[%s] contains illegal characters, allowing only %s"
,
group
,
VALID_PATTERN_STR
),
null
);
}
if
(
group
.
length
()
>
CHARACTER_MAX_LENGTH
)
{
throw
new
MQClientException
(
"the specified group is longer than group max length 255."
,
null
);
}
}
/**
...
...
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.consumer
;
import
java.util.Collection
;
import
java.util.List
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
;
import
org.apache.rocketmq.client.consumer.store.OffsetStore
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.apache.rocketmq.remoting.RPCHook
;
public
class
DefaultLitePullConsumer
extends
ClientConfig
implements
LitePullConsumer
{
private
final
DefaultLitePullConsumerImpl
defaultLitePullConsumerImpl
;
/**
* Consumers belonging to the same consumer group share a group id. The consumers in a group then
* divides the topic as fairly amongst themselves as possible by establishing that each queue is only
* consumed by a single consumer from the group. If all consumers are from the same group, it functions
* as a traditional message queue. Each message would be consumed by one consumer of the group only.
* When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional
* publish-subscribe model. The messages are broadcast to all consumer groups.
*/
private
String
consumerGroup
;
/**
* Long polling mode, the Consumer connection max suspend time, it is not recommended to modify
*/
private
long
brokerSuspendMaxTimeMillis
=
1000
*
20
;
/**
* Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
* recommended to modify
*/
private
long
consumerTimeoutMillisWhenSuspend
=
1000
*
30
;
/**
* The socket timeout in milliseconds
*/
private
long
consumerPullTimeoutMillis
=
1000
*
10
;
/**
* Consumption pattern,default is clustering
*/
private
MessageModel
messageModel
=
MessageModel
.
CLUSTERING
;
/**
* Message queue listener
*/
private
MessageQueueListener
messageQueueListener
;
/**
* Offset Storage
*/
private
OffsetStore
offsetStore
;
/**
* Queue allocation algorithm
*/
private
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
=
new
AllocateMessageQueueAveragely
();
/**
* Whether the unit of subscription group
*/
private
boolean
unitMode
=
false
;
/**
* The flag for auto commit offset
*/
private
boolean
autoCommit
=
true
;
/**
* Pull thread number
*/
private
int
pullThreadNums
=
20
;
/**
* Maximum commit offset interval time in milliseconds.
*/
private
long
autoCommitIntervalMillis
=
5
*
1000
;
/**
* Maximum number of messages pulled each time.
*/
private
int
pullBatchSize
=
10
;
/**
* Flow control threshold for consume request, each consumer will cache at most 10000 consume requests by default.
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private
long
pullThresholdForAll
=
10000
;
/**
* Consume max span offset.
*/
private
int
consumeMaxSpan
=
2000
;
/**
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
* the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private
int
pullThresholdForQueue
=
1000
;
/**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*
* <p>
* The size of a message only measured by message body, so it's not accurate
*/
private
int
pullThresholdSizeForQueue
=
100
;
/**
* The poll timeout in milliseconds
*/
private
long
pollTimeoutMillis
=
1000
*
5
;
/**
* Interval time in in milliseconds for checking changes in topic metadata.
*/
private
long
topicMetadataCheckIntervalMillis
=
30
*
1000
;
/**
* Default constructor.
*/
public
DefaultLitePullConsumer
()
{
this
(
null
,
MixAll
.
DEFAULT_CONSUMER_GROUP
,
null
);
}
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public
DefaultLitePullConsumer
(
final
String
consumerGroup
)
{
this
(
null
,
consumerGroup
,
null
);
}
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public
DefaultLitePullConsumer
(
RPCHook
rpcHook
)
{
this
(
null
,
MixAll
.
DEFAULT_CONSUMER_GROUP
,
rpcHook
);
}
/**
* Constructor specifying consumer group, RPC hook
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public
DefaultLitePullConsumer
(
final
String
consumerGroup
,
RPCHook
rpcHook
)
{
this
(
null
,
consumerGroup
,
rpcHook
);
}
/**
* Constructor specifying namespace, consumer group and RPC hook.
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public
DefaultLitePullConsumer
(
final
String
namespace
,
final
String
consumerGroup
,
RPCHook
rpcHook
)
{
this
.
namespace
=
namespace
;
this
.
consumerGroup
=
consumerGroup
;
defaultLitePullConsumerImpl
=
new
DefaultLitePullConsumerImpl
(
this
,
rpcHook
);
}
@Override
public
void
start
()
throws
MQClientException
{
this
.
defaultLitePullConsumerImpl
.
start
();
}
@Override
public
void
shutdown
()
{
this
.
defaultLitePullConsumerImpl
.
shutdown
();
}
@Override
public
void
subscribe
(
String
topic
,
String
subExpression
)
throws
MQClientException
{
this
.
defaultLitePullConsumerImpl
.
subscribe
(
withNamespace
(
topic
),
subExpression
);
}
@Override
public
void
subscribe
(
String
topic
,
MessageSelector
messageSelector
)
throws
MQClientException
{
this
.
defaultLitePullConsumerImpl
.
subscribe
(
withNamespace
(
topic
),
messageSelector
);
}
@Override
public
void
unsubscribe
(
String
topic
)
{
this
.
defaultLitePullConsumerImpl
.
unsubscribe
(
withNamespace
(
topic
));
}
@Override
public
void
assign
(
Collection
<
MessageQueue
>
messageQueues
)
{
defaultLitePullConsumerImpl
.
assign
(
queuesWithNamespace
(
messageQueues
));
}
@Override
public
List
<
MessageExt
>
poll
()
{
return
defaultLitePullConsumerImpl
.
poll
(
this
.
getPollTimeoutMillis
());
}
@Override
public
List
<
MessageExt
>
poll
(
long
timeout
)
{
return
defaultLitePullConsumerImpl
.
poll
(
timeout
);
}
@Override
public
void
seek
(
MessageQueue
messageQueue
,
long
offset
)
throws
MQClientException
{
this
.
defaultLitePullConsumerImpl
.
seek
(
queueWithNamespace
(
messageQueue
),
offset
);
}
@Override
public
void
pause
(
Collection
<
MessageQueue
>
messageQueues
)
{
this
.
defaultLitePullConsumerImpl
.
pause
(
queuesWithNamespace
(
messageQueues
));
}
@Override
public
void
resume
(
Collection
<
MessageQueue
>
messageQueues
)
{
this
.
defaultLitePullConsumerImpl
.
resume
(
queuesWithNamespace
(
messageQueues
));
}
@Override
public
Collection
<
MessageQueue
>
fetchMessageQueues
(
String
topic
)
throws
MQClientException
{
return
this
.
defaultLitePullConsumerImpl
.
fetchMessageQueues
(
withNamespace
(
topic
));
}
@Override
public
Long
offsetForTimestamp
(
MessageQueue
messageQueue
,
Long
timestamp
)
throws
MQClientException
{
return
this
.
defaultLitePullConsumerImpl
.
searchOffset
(
queueWithNamespace
(
messageQueue
),
timestamp
);
}
@Override
public
void
registerTopicMessageQueueChangeListener
(
String
topic
,
TopicMessageQueueChangeListener
topicMessageQueueChangeListener
)
throws
MQClientException
{
this
.
defaultLitePullConsumerImpl
.
registerTopicMessageQueueChangeListener
(
withNamespace
(
topic
),
topicMessageQueueChangeListener
);
}
@Override
public
void
commitSync
()
{
this
.
defaultLitePullConsumerImpl
.
commitSync
();
}
@Override
public
Long
committed
(
MessageQueue
messageQueue
)
throws
MQClientException
{
return
this
.
defaultLitePullConsumerImpl
.
committed
(
messageQueue
);
}
@Override
public
boolean
isAutoCommit
()
{
return
autoCommit
;
}
@Override
public
void
setAutoCommit
(
boolean
autoCommit
)
{
this
.
autoCommit
=
autoCommit
;
}
public
int
getPullThreadNums
()
{
return
pullThreadNums
;
}
public
void
setPullThreadNums
(
int
pullThreadNums
)
{
this
.
pullThreadNums
=
pullThreadNums
;
}
public
long
getAutoCommitIntervalMillis
()
{
return
autoCommitIntervalMillis
;
}
public
void
setAutoCommitIntervalMillis
(
long
autoCommitIntervalMillis
)
{
this
.
autoCommitIntervalMillis
=
autoCommitIntervalMillis
;
}
public
int
getPullBatchSize
()
{
return
pullBatchSize
;
}
public
void
setPullBatchSize
(
int
pullBatchSize
)
{
this
.
pullBatchSize
=
pullBatchSize
;
}
public
long
getPullThresholdForAll
()
{
return
pullThresholdForAll
;
}
public
void
setPullThresholdForAll
(
long
pullThresholdForAll
)
{
this
.
pullThresholdForAll
=
pullThresholdForAll
;
}
public
int
getConsumeMaxSpan
()
{
return
consumeMaxSpan
;
}
public
void
setConsumeMaxSpan
(
int
consumeMaxSpan
)
{
this
.
consumeMaxSpan
=
consumeMaxSpan
;
}
public
int
getPullThresholdForQueue
()
{
return
pullThresholdForQueue
;
}
public
void
setPullThresholdForQueue
(
int
pullThresholdForQueue
)
{
this
.
pullThresholdForQueue
=
pullThresholdForQueue
;
}
public
int
getPullThresholdSizeForQueue
()
{
return
pullThresholdSizeForQueue
;
}
public
void
setPullThresholdSizeForQueue
(
int
pullThresholdSizeForQueue
)
{
this
.
pullThresholdSizeForQueue
=
pullThresholdSizeForQueue
;
}
public
AllocateMessageQueueStrategy
getAllocateMessageQueueStrategy
()
{
return
allocateMessageQueueStrategy
;
}
public
void
setAllocateMessageQueueStrategy
(
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
)
{
this
.
allocateMessageQueueStrategy
=
allocateMessageQueueStrategy
;
}
public
long
getBrokerSuspendMaxTimeMillis
()
{
return
brokerSuspendMaxTimeMillis
;
}
public
long
getPollTimeoutMillis
()
{
return
pollTimeoutMillis
;
}
public
void
setPollTimeoutMillis
(
long
pollTimeoutMillis
)
{
this
.
pollTimeoutMillis
=
pollTimeoutMillis
;
}
public
OffsetStore
getOffsetStore
()
{
return
offsetStore
;
}
public
void
setOffsetStore
(
OffsetStore
offsetStore
)
{
this
.
offsetStore
=
offsetStore
;
}
public
boolean
isUnitMode
()
{
return
unitMode
;
}
public
void
setUnitMode
(
boolean
isUnitMode
)
{
this
.
unitMode
=
isUnitMode
;
}
public
MessageModel
getMessageModel
()
{
return
messageModel
;
}
public
void
setMessageModel
(
MessageModel
messageModel
)
{
this
.
messageModel
=
messageModel
;
}
public
String
getConsumerGroup
()
{
return
consumerGroup
;
}
public
MessageQueueListener
getMessageQueueListener
()
{
return
messageQueueListener
;
}
public
void
setMessageQueueListener
(
MessageQueueListener
messageQueueListener
)
{
this
.
messageQueueListener
=
messageQueueListener
;
}
public
long
getConsumerPullTimeoutMillis
()
{
return
consumerPullTimeoutMillis
;
}
public
void
setConsumerPullTimeoutMillis
(
long
consumerPullTimeoutMillis
)
{
this
.
consumerPullTimeoutMillis
=
consumerPullTimeoutMillis
;
}
public
long
getConsumerTimeoutMillisWhenSuspend
()
{
return
consumerTimeoutMillisWhenSuspend
;
}
public
void
setConsumerTimeoutMillisWhenSuspend
(
long
consumerTimeoutMillisWhenSuspend
)
{
this
.
consumerTimeoutMillisWhenSuspend
=
consumerTimeoutMillisWhenSuspend
;
}
public
long
getTopicMetadataCheckIntervalMillis
()
{
return
topicMetadataCheckIntervalMillis
;
}
public
void
setTopicMetadataCheckIntervalMillis
(
long
topicMetadataCheckIntervalMillis
)
{
this
.
topicMetadataCheckIntervalMillis
=
topicMetadataCheckIntervalMillis
;
}
}
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
浏览文件 @
9edeb4ec
...
...
@@ -35,9 +35,13 @@ import org.apache.rocketmq.remoting.RPCHook;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
/**
* Default pulling consumer
* Default pulling consumer.
* This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public
class
DefaultMQPullConsumer
extends
ClientConfig
implements
MQPullConsumer
{
protected
final
transient
DefaultMQPullConsumerImpl
defaultMQPullConsumerImpl
;
/**
...
...
client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.consumer
;
import
java.util.Collection
;
import
java.util.List
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
public
interface
LitePullConsumer
{
/**
* Start the consumer
*/
void
start
()
throws
MQClientException
;
/**
* Shutdown the consumer
*/
void
shutdown
();
/**
* Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
void
subscribe
(
final
String
topic
,
final
String
subExpression
)
throws
MQClientException
;
/**
* Subscribe some topic with selector.
*
* @param selector message selector({@link MessageSelector}), can be null.
* @throws MQClientException if there is any client error.
*/
void
subscribe
(
final
String
topic
,
final
MessageSelector
selector
)
throws
MQClientException
;
/**
* Unsubscribe consumption some topic
*
* @param topic Message topic that needs to be unsubscribe.
*/
void
unsubscribe
(
final
String
topic
);
/**
* Manually assign a list of message queues to this consumer. This interface does not allow for incremental
* assignment and will replace the previous assignment (if there is one).
*
* @param messageQueues Message queues that needs to be assigned.
*/
void
assign
(
Collection
<
MessageQueue
>
messageQueues
);
/**
* Fetch data for the topics or partitions specified using assign API
*
* @return list of message, can be null.
*/
List
<
MessageExt
>
poll
();
/**
* Fetch data for the topics or partitions specified using assign API
*
* @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
* negative
* @return list of message, can be null.
*/
List
<
MessageExt
>
poll
(
long
timeout
);
/**
* Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
* message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
* this API is arbitrarily used in the middle of consumption.
*
* @param messageQueue
* @param offset
*/
void
seek
(
MessageQueue
messageQueue
,
long
offset
)
throws
MQClientException
;
/**
* Suspend pulling from the requested message queues.
*
* Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
* messages of the requested message queues drain.
*
* Note that this method does not affect message queue subscription. In particular, it does not cause a group
* rebalance.
*
* @param messageQueues Message queues that needs to be paused.
*/
void
pause
(
Collection
<
MessageQueue
>
messageQueues
);
/**
* Resume specified message queues which have been paused with {@link #pause(Collection)}.
*
* @param messageQueues Message queues that needs to be resumed.
*/
void
resume
(
Collection
<
MessageQueue
>
messageQueues
);
/**
* Whether to enable auto-commit consume offset.
*
* @return true if enable auto-commit, false if disable auto-commit.
*/
boolean
isAutoCommit
();
/**
* Set whether to enable auto-commit consume offset.
*
* @param autoCommit Whether to enable auto-commit.
*/
void
setAutoCommit
(
boolean
autoCommit
);
/**
* Get metadata about the message queues for a given topic.
*
* @param topic The topic that need to get metadata.
* @return collection of message queues
* @throws MQClientException if there is any client error.
*/
Collection
<
MessageQueue
>
fetchMessageQueues
(
String
topic
)
throws
MQClientException
;
/**
* Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
* queue.
*
* @param messageQueue Message queues that needs to get offset by timestamp.
* @param timestamp
* @return offset
* @throws MQClientException if there is any client error.
*/
Long
offsetForTimestamp
(
MessageQueue
messageQueue
,
Long
timestamp
)
throws
MQClientException
;
/**
* Manually commit consume offset.
*/
void
commitSync
();
/**
* Get the last committed offset for the given message queue.
*
* @param messageQueue
* @return offset, if offset equals -1 means no offset in broker.
* @throws MQClientException if there is any client error.
*/
Long
committed
(
MessageQueue
messageQueue
)
throws
MQClientException
;
/**
* Register a callback for sensing topic metadata changes.
*
* @param topic The topic that need to monitor.
* @param topicMessageQueueChangeListener Callback when topic metadata changes, refer {@link
* TopicMessageQueueChangeListener}
* @throws MQClientException if there is any client error.
*/
void
registerTopicMessageQueueChangeListener
(
String
topic
,
TopicMessageQueueChangeListener
topicMessageQueueChangeListener
)
throws
MQClientException
;
}
client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
浏览文件 @
9edeb4ec
...
...
@@ -169,4 +169,5 @@ public interface MQPullConsumer extends MQConsumer {
*/
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
,
String
brokerName
,
String
consumerGroup
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
;
}
client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
浏览文件 @
9edeb4ec
...
...
@@ -32,7 +32,9 @@ import org.apache.rocketmq.logging.InternalLogger;
import
org.apache.rocketmq.remoting.RPCHook
;
/**
* Schedule service for pull consumer
* Schedule service for pull consumer.
* This Consumer will be removed in 2022, and a better implementation {@link
* DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages.
*/
public
class
MQPullConsumerScheduleService
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
...
...
@@ -157,7 +159,7 @@ public class MQPullConsumerScheduleService {
}
}
class
PullTaskImpl
implements
Runnable
{
public
class
PullTaskImpl
implements
Runnable
{
private
final
MessageQueue
messageQueue
;
private
volatile
boolean
cancelled
=
false
;
...
...
client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.consumer
;
import
java.util.Set
;
import
org.apache.rocketmq.common.message.MessageQueue
;
public
interface
TopicMessageQueueChangeListener
{
/**
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
* expanded or shrunk.
*
* @param messageQueues
*/
void
onChanged
(
String
topic
,
Set
<
MessageQueue
>
messageQueues
);
}
client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
浏览文件 @
9edeb4ec
...
...
@@ -117,25 +117,24 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
return
;
final
HashSet
<
MessageQueue
>
unusedMQ
=
new
HashSet
<
MessageQueue
>();
if
(!
mqs
.
isEmpty
())
{
for
(
Map
.
Entry
<
MessageQueue
,
AtomicLong
>
entry
:
this
.
offsetTable
.
entrySet
())
{
MessageQueue
mq
=
entry
.
getKey
();
AtomicLong
offset
=
entry
.
getValue
();
if
(
offset
!=
null
)
{
if
(
mqs
.
contains
(
mq
))
{
try
{
this
.
updateConsumeOffsetToBroker
(
mq
,
offset
.
get
());
log
.
info
(
"[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}"
,
this
.
groupName
,
this
.
mQClientFactory
.
getClientId
(),
mq
,
offset
.
get
());
}
catch
(
Exception
e
)
{
log
.
error
(
"updateConsumeOffsetToBroker exception, "
+
mq
.
toString
(),
e
);
}
}
else
{
unusedMQ
.
add
(
mq
);
for
(
Map
.
Entry
<
MessageQueue
,
AtomicLong
>
entry
:
this
.
offsetTable
.
entrySet
())
{
MessageQueue
mq
=
entry
.
getKey
();
AtomicLong
offset
=
entry
.
getValue
();
if
(
offset
!=
null
)
{
if
(
mqs
.
contains
(
mq
))
{
try
{
this
.
updateConsumeOffsetToBroker
(
mq
,
offset
.
get
());
log
.
info
(
"[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}"
,
this
.
groupName
,
this
.
mQClientFactory
.
getClientId
(),
mq
,
offset
.
get
());
}
catch
(
Exception
e
)
{
log
.
error
(
"updateConsumeOffsetToBroker exception, "
+
mq
.
toString
(),
e
);
}
}
else
{
unusedMQ
.
add
(
mq
);
}
}
}
...
...
@@ -187,8 +186,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
* Update the Consumer Offset in one way, once the Master is off, updated to Slave,
* here need to be optimized.
* Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.
*/
private
void
updateConsumeOffsetToBroker
(
MessageQueue
mq
,
long
offset
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
...
...
@@ -196,15 +194,13 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
* here need to be optimized.
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/
@Override
public
void
updateConsumeOffsetToBroker
(
MessageQueue
mq
,
long
offset
,
boolean
isOneway
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
FindBrokerResult
findBrokerResult
=
this
.
mQClientFactory
.
findBrokerAddressInAdmin
(
mq
.
getBrokerName
());
if
(
null
==
findBrokerResult
)
{
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
mq
.
getTopic
());
findBrokerResult
=
this
.
mQClientFactory
.
findBrokerAddressInAdmin
(
mq
.
getBrokerName
());
}
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.impl.consumer
;
import
java.util.Collection
;
import
java.util.Iterator
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.apache.rocketmq.common.message.MessageQueue
;
public
class
AssignedMessageQueue
{
private
final
ConcurrentHashMap
<
MessageQueue
,
MessageQueueState
>
assignedMessageQueueState
;
private
RebalanceImpl
rebalanceImpl
;
public
AssignedMessageQueue
()
{
assignedMessageQueueState
=
new
ConcurrentHashMap
<
MessageQueue
,
MessageQueueState
>();
}
public
void
setRebalanceImpl
(
RebalanceImpl
rebalanceImpl
)
{
this
.
rebalanceImpl
=
rebalanceImpl
;
}
public
Set
<
MessageQueue
>
messageQueues
()
{
return
assignedMessageQueueState
.
keySet
();
}
public
boolean
isPaused
(
MessageQueue
messageQueue
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
messageQueueState
!=
null
)
{
return
messageQueueState
.
isPaused
();
}
return
true
;
}
public
void
pause
(
Collection
<
MessageQueue
>
messageQueues
)
{
for
(
MessageQueue
messageQueue
:
messageQueues
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
assignedMessageQueueState
.
get
(
messageQueue
)
!=
null
)
{
messageQueueState
.
setPaused
(
true
);
}
}
}
public
void
resume
(
Collection
<
MessageQueue
>
messageQueueCollection
)
{
for
(
MessageQueue
messageQueue
:
messageQueueCollection
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
assignedMessageQueueState
.
get
(
messageQueue
)
!=
null
)
{
messageQueueState
.
setPaused
(
false
);
}
}
}
public
ProcessQueue
getProcessQueue
(
MessageQueue
messageQueue
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
messageQueueState
!=
null
)
{
return
messageQueueState
.
getProcessQueue
();
}
return
null
;
}
public
long
getPullOffset
(
MessageQueue
messageQueue
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
messageQueueState
!=
null
)
{
return
messageQueueState
.
getPullOffset
();
}
return
-
1
;
}
public
void
updatePullOffset
(
MessageQueue
messageQueue
,
long
offset
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
messageQueueState
!=
null
)
{
messageQueueState
.
setPullOffset
(
offset
);
}
}
public
long
getConusmerOffset
(
MessageQueue
messageQueue
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
messageQueueState
!=
null
)
{
return
messageQueueState
.
getConsumeOffset
();
}
return
-
1
;
}
public
void
updateConsumeOffset
(
MessageQueue
messageQueue
,
long
offset
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
messageQueueState
!=
null
)
{
messageQueueState
.
setConsumeOffset
(
offset
);
}
}
public
void
setSeekOffset
(
MessageQueue
messageQueue
,
long
offset
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
messageQueueState
!=
null
)
{
messageQueueState
.
setSeekOffset
(
offset
);
}
}
public
long
getSeekOffset
(
MessageQueue
messageQueue
)
{
MessageQueueState
messageQueueState
=
assignedMessageQueueState
.
get
(
messageQueue
);
if
(
messageQueueState
!=
null
)
{
return
messageQueueState
.
getSeekOffset
();
}
return
-
1
;
}
public
void
updateAssignedMessageQueue
(
String
topic
,
Collection
<
MessageQueue
>
assigned
)
{
synchronized
(
this
.
assignedMessageQueueState
)
{
Iterator
<
Map
.
Entry
<
MessageQueue
,
MessageQueueState
>>
it
=
this
.
assignedMessageQueueState
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
Map
.
Entry
<
MessageQueue
,
MessageQueueState
>
next
=
it
.
next
();
if
(
next
.
getKey
().
getTopic
().
equals
(
topic
))
{
if
(!
assigned
.
contains
(
next
.
getKey
()))
{
next
.
getValue
().
getProcessQueue
().
setDropped
(
true
);
it
.
remove
();
}
}
}
addAssignedMessageQueue
(
assigned
);
}
}
public
void
updateAssignedMessageQueue
(
Collection
<
MessageQueue
>
assigned
)
{
synchronized
(
this
.
assignedMessageQueueState
)
{
Iterator
<
Map
.
Entry
<
MessageQueue
,
MessageQueueState
>>
it
=
this
.
assignedMessageQueueState
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
Map
.
Entry
<
MessageQueue
,
MessageQueueState
>
next
=
it
.
next
();
if
(!
assigned
.
contains
(
next
.
getKey
()))
{
next
.
getValue
().
getProcessQueue
().
setDropped
(
true
);
it
.
remove
();
}
}
addAssignedMessageQueue
(
assigned
);
}
}
private
void
addAssignedMessageQueue
(
Collection
<
MessageQueue
>
assigned
)
{
for
(
MessageQueue
messageQueue
:
assigned
)
{
if
(!
this
.
assignedMessageQueueState
.
containsKey
(
messageQueue
))
{
MessageQueueState
messageQueueState
;
if
(
rebalanceImpl
!=
null
&&
rebalanceImpl
.
getProcessQueueTable
().
get
(
messageQueue
)
!=
null
)
{
messageQueueState
=
new
MessageQueueState
(
messageQueue
,
rebalanceImpl
.
getProcessQueueTable
().
get
(
messageQueue
));
}
else
{
ProcessQueue
processQueue
=
new
ProcessQueue
();
messageQueueState
=
new
MessageQueueState
(
messageQueue
,
processQueue
);
}
this
.
assignedMessageQueueState
.
put
(
messageQueue
,
messageQueueState
);
}
}
}
public
void
removeAssignedMessageQueue
(
String
topic
)
{
synchronized
(
this
.
assignedMessageQueueState
)
{
Iterator
<
Map
.
Entry
<
MessageQueue
,
MessageQueueState
>>
it
=
this
.
assignedMessageQueueState
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
Map
.
Entry
<
MessageQueue
,
MessageQueueState
>
next
=
it
.
next
();
if
(
next
.
getKey
().
getTopic
().
equals
(
topic
))
{
it
.
remove
();
}
}
}
}
private
class
MessageQueueState
{
private
MessageQueue
messageQueue
;
private
ProcessQueue
processQueue
;
private
volatile
boolean
paused
=
false
;
private
volatile
long
pullOffset
=
-
1
;
private
volatile
long
consumeOffset
=
-
1
;
private
volatile
long
seekOffset
=
-
1
;
private
MessageQueueState
(
MessageQueue
messageQueue
,
ProcessQueue
processQueue
)
{
this
.
messageQueue
=
messageQueue
;
this
.
processQueue
=
processQueue
;
}
public
MessageQueue
getMessageQueue
()
{
return
messageQueue
;
}
public
void
setMessageQueue
(
MessageQueue
messageQueue
)
{
this
.
messageQueue
=
messageQueue
;
}
public
boolean
isPaused
()
{
return
paused
;
}
public
void
setPaused
(
boolean
paused
)
{
this
.
paused
=
paused
;
}
public
long
getPullOffset
()
{
return
pullOffset
;
}
public
void
setPullOffset
(
long
pullOffset
)
{
this
.
pullOffset
=
pullOffset
;
}
public
ProcessQueue
getProcessQueue
()
{
return
processQueue
;
}
public
void
setProcessQueue
(
ProcessQueue
processQueue
)
{
this
.
processQueue
=
processQueue
;
}
public
long
getConsumeOffset
()
{
return
consumeOffset
;
}
public
void
setConsumeOffset
(
long
consumeOffset
)
{
this
.
consumeOffset
=
consumeOffset
;
}
public
long
getSeekOffset
()
{
return
seekOffset
;
}
public
void
setSeekOffset
(
long
seekOffset
)
{
this
.
seekOffset
=
seekOffset
;
}
}
}
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.impl.consumer
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.ArrayList
;
import
java.util.Iterator
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.Collection
;
import
java.util.HashSet
;
import
java.util.Properties
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.ScheduledThreadPoolExecutor
;
import
java.util.concurrent.ThreadFactory
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.rocketmq.client.Validators
;
import
org.apache.rocketmq.client.consumer.DefaultLitePullConsumer
;
import
org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener
;
import
org.apache.rocketmq.client.consumer.MessageSelector
;
import
org.apache.rocketmq.client.consumer.MessageQueueListener
;
import
org.apache.rocketmq.client.consumer.PullResult
;
import
org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore
;
import
org.apache.rocketmq.client.consumer.store.OffsetStore
;
import
org.apache.rocketmq.client.consumer.store.ReadOffsetType
;
import
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.hook.FilterMessageHook
;
import
org.apache.rocketmq.client.impl.CommunicationMode
;
import
org.apache.rocketmq.client.impl.MQClientManager
;
import
org.apache.rocketmq.client.impl.factory.MQClientInstance
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.ServiceState
;
import
org.apache.rocketmq.common.ThreadFactoryImpl
;
import
org.apache.rocketmq.common.consumer.ConsumeFromWhere
;
import
org.apache.rocketmq.common.filter.ExpressionType
;
import
org.apache.rocketmq.common.filter.FilterAPI
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.NamespaceUtil
;
import
org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumeType
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData
;
import
org.apache.rocketmq.common.sysflag.PullSysFlag
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
public
class
DefaultLitePullConsumerImpl
implements
MQConsumerInner
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
private
final
long
consumerStartTimestamp
=
System
.
currentTimeMillis
();
private
final
RPCHook
rpcHook
;
private
final
ArrayList
<
FilterMessageHook
>
filterMessageHookList
=
new
ArrayList
<
FilterMessageHook
>();
private
volatile
ServiceState
serviceState
=
ServiceState
.
CREATE_JUST
;
protected
MQClientInstance
mQClientFactory
;
private
PullAPIWrapper
pullAPIWrapper
;
private
OffsetStore
offsetStore
;
private
RebalanceImpl
rebalanceImpl
=
new
RebalanceLitePullImpl
(
this
);
private
enum
SubscriptionType
{
NONE
,
SUBSCRIBE
,
ASSIGN
}
private
static
final
String
NOT_RUNNING_EXCEPTION_MESSAGE
=
"The consumer not running, please start it first."
;
private
static
final
String
SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE
=
"Subscribe and assign are mutually exclusive."
;
/**
* the type of subscription
*/
private
SubscriptionType
subscriptionType
=
SubscriptionType
.
NONE
;
/**
* Delay some time when exception occur
*/
private
static
final
long
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION
=
1000
;
/**
* Flow control interval
*/
private
static
final
long
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
=
50
;
/**
* Delay some time when suspend pull service
*/
private
static
final
long
PULL_TIME_DELAY_MILLS_WHEN_PAUSE
=
1000
;
private
DefaultLitePullConsumer
defaultLitePullConsumer
;
private
final
ConcurrentMap
<
MessageQueue
,
PullTaskImpl
>
taskTable
=
new
ConcurrentHashMap
<
MessageQueue
,
PullTaskImpl
>();
private
AssignedMessageQueue
assignedMessageQueue
=
new
AssignedMessageQueue
();
private
final
BlockingQueue
<
ConsumeRequest
>
consumeRequestCache
=
new
LinkedBlockingQueue
<
ConsumeRequest
>();
private
ScheduledThreadPoolExecutor
scheduledThreadPoolExecutor
;
private
final
ScheduledExecutorService
scheduledExecutorService
;
private
Map
<
String
,
TopicMessageQueueChangeListener
>
topicMessageQueueChangeListenerMap
=
new
HashMap
<
String
,
TopicMessageQueueChangeListener
>();
private
Map
<
String
,
Set
<
MessageQueue
>>
messageQueuesForTopic
=
new
HashMap
<
String
,
Set
<
MessageQueue
>>();
private
long
consumeRequestFlowControlTimes
=
0L
;
private
long
queueFlowControlTimes
=
0L
;
private
long
queueMaxSpanFlowControlTimes
=
0L
;
private
long
nextAutoCommitDeadline
=
-
1L
;
private
final
MessageQueueLock
messageQueueLock
=
new
MessageQueueLock
();
public
DefaultLitePullConsumerImpl
(
final
DefaultLitePullConsumer
defaultLitePullConsumer
,
final
RPCHook
rpcHook
)
{
this
.
defaultLitePullConsumer
=
defaultLitePullConsumer
;
this
.
rpcHook
=
rpcHook
;
this
.
scheduledThreadPoolExecutor
=
new
ScheduledThreadPoolExecutor
(
this
.
defaultLitePullConsumer
.
getPullThreadNums
(),
new
ThreadFactoryImpl
(
"PullMsgThread-"
+
this
.
defaultLitePullConsumer
.
getConsumerGroup
())
);
this
.
scheduledExecutorService
=
Executors
.
newSingleThreadScheduledExecutor
(
new
ThreadFactory
()
{
@Override
public
Thread
newThread
(
Runnable
r
)
{
return
new
Thread
(
r
,
"MonitorMessageQueueChangeThread"
);
}
});
}
private
void
checkServiceState
()
{
if
(
this
.
serviceState
!=
ServiceState
.
RUNNING
)
throw
new
IllegalStateException
(
NOT_RUNNING_EXCEPTION_MESSAGE
);
}
private
synchronized
void
setSubscriptionType
(
SubscriptionType
type
)
{
if
(
this
.
subscriptionType
==
SubscriptionType
.
NONE
)
this
.
subscriptionType
=
type
;
else
if
(
this
.
subscriptionType
!=
type
)
throw
new
IllegalStateException
(
SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE
);
}
private
void
updateAssignedMessageQueue
(
String
topic
,
Set
<
MessageQueue
>
assignedMessageQueue
)
{
this
.
assignedMessageQueue
.
updateAssignedMessageQueue
(
topic
,
assignedMessageQueue
);
}
private
void
updatePullTask
(
String
topic
,
Set
<
MessageQueue
>
mqNewSet
)
{
Iterator
<
Map
.
Entry
<
MessageQueue
,
PullTaskImpl
>>
it
=
this
.
taskTable
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
Map
.
Entry
<
MessageQueue
,
PullTaskImpl
>
next
=
it
.
next
();
if
(
next
.
getKey
().
getTopic
().
equals
(
topic
))
{
if
(!
mqNewSet
.
contains
(
next
.
getKey
()))
{
next
.
getValue
().
setCancelled
(
true
);
it
.
remove
();
}
}
}
startPullTask
(
mqNewSet
);
}
class
MessageQueueListenerImpl
implements
MessageQueueListener
{
@Override
public
void
messageQueueChanged
(
String
topic
,
Set
<
MessageQueue
>
mqAll
,
Set
<
MessageQueue
>
mqDivided
)
{
MessageModel
messageModel
=
defaultLitePullConsumer
.
getMessageModel
();
switch
(
messageModel
)
{
case
BROADCASTING:
updateAssignedMessageQueue
(
topic
,
mqAll
);
updatePullTask
(
topic
,
mqAll
);
break
;
case
CLUSTERING:
updateAssignedMessageQueue
(
topic
,
mqDivided
);
updatePullTask
(
topic
,
mqDivided
);
break
;
default
:
break
;
}
}
}
private
int
nextPullBatchSize
()
{
return
Math
.
min
(
this
.
defaultLitePullConsumer
.
getPullBatchSize
(),
consumeRequestCache
.
remainingCapacity
());
}
public
synchronized
void
shutdown
()
{
switch
(
this
.
serviceState
)
{
case
CREATE_JUST:
break
;
case
RUNNING:
persistConsumerOffset
();
this
.
mQClientFactory
.
unregisterConsumer
(
this
.
defaultLitePullConsumer
.
getConsumerGroup
());
scheduledThreadPoolExecutor
.
shutdown
();
scheduledExecutorService
.
shutdown
();
this
.
mQClientFactory
.
shutdown
();
this
.
serviceState
=
ServiceState
.
SHUTDOWN_ALREADY
;
log
.
info
(
"the consumer [{}] shutdown OK"
,
this
.
defaultLitePullConsumer
.
getConsumerGroup
());
break
;
default
:
break
;
}
}
public
synchronized
void
start
()
throws
MQClientException
{
switch
(
this
.
serviceState
)
{
case
CREATE_JUST:
this
.
serviceState
=
ServiceState
.
START_FAILED
;
this
.
checkConfig
();
if
(
this
.
defaultLitePullConsumer
.
getMessageModel
()
==
MessageModel
.
CLUSTERING
)
{
this
.
defaultLitePullConsumer
.
changeInstanceNameToPID
();
}
initMQClientFactory
();
initRebalanceImpl
();
initPullAPIWrapper
();
initOffsetStore
();
mQClientFactory
.
start
();
startScheduleTask
();
this
.
serviceState
=
ServiceState
.
RUNNING
;
log
.
info
(
"the consumer [{}] start OK"
,
this
.
defaultLitePullConsumer
.
getConsumerGroup
());
operateAfterRunning
();
break
;
case
RUNNING:
case
START_FAILED:
case
SHUTDOWN_ALREADY:
throw
new
MQClientException
(
"The PullConsumer service state not OK, maybe started once, "
+
this
.
serviceState
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
CLIENT_SERVICE_NOT_OK
),
null
);
default
:
break
;
}
}
private
void
initMQClientFactory
()
throws
MQClientException
{
this
.
mQClientFactory
=
MQClientManager
.
getInstance
().
getAndCreateMQClientInstance
(
this
.
defaultLitePullConsumer
,
this
.
rpcHook
);
boolean
registerOK
=
mQClientFactory
.
registerConsumer
(
this
.
defaultLitePullConsumer
.
getConsumerGroup
(),
this
);
if
(!
registerOK
)
{
this
.
serviceState
=
ServiceState
.
CREATE_JUST
;
throw
new
MQClientException
(
"The consumer group["
+
this
.
defaultLitePullConsumer
.
getConsumerGroup
()
+
"] has been created before, specify another name please."
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
GROUP_NAME_DUPLICATE_URL
),
null
);
}
}
private
void
initRebalanceImpl
()
{
this
.
rebalanceImpl
.
setConsumerGroup
(
this
.
defaultLitePullConsumer
.
getConsumerGroup
());
this
.
rebalanceImpl
.
setMessageModel
(
this
.
defaultLitePullConsumer
.
getMessageModel
());
this
.
rebalanceImpl
.
setAllocateMessageQueueStrategy
(
this
.
defaultLitePullConsumer
.
getAllocateMessageQueueStrategy
());
this
.
rebalanceImpl
.
setmQClientFactory
(
this
.
mQClientFactory
);
}
private
void
initPullAPIWrapper
()
{
this
.
pullAPIWrapper
=
new
PullAPIWrapper
(
mQClientFactory
,
this
.
defaultLitePullConsumer
.
getConsumerGroup
(),
isUnitMode
());
this
.
pullAPIWrapper
.
registerFilterMessageHook
(
filterMessageHookList
);
}
private
void
initOffsetStore
()
throws
MQClientException
{
if
(
this
.
defaultLitePullConsumer
.
getOffsetStore
()
!=
null
)
{
this
.
offsetStore
=
this
.
defaultLitePullConsumer
.
getOffsetStore
();
}
else
{
switch
(
this
.
defaultLitePullConsumer
.
getMessageModel
())
{
case
BROADCASTING:
this
.
offsetStore
=
new
LocalFileOffsetStore
(
this
.
mQClientFactory
,
this
.
defaultLitePullConsumer
.
getConsumerGroup
());
break
;
case
CLUSTERING:
this
.
offsetStore
=
new
RemoteBrokerOffsetStore
(
this
.
mQClientFactory
,
this
.
defaultLitePullConsumer
.
getConsumerGroup
());
break
;
default
:
break
;
}
this
.
defaultLitePullConsumer
.
setOffsetStore
(
this
.
offsetStore
);
}
this
.
offsetStore
.
load
();
}
private
void
startScheduleTask
()
{
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
public
void
run
()
{
try
{
fetchTopicMessageQueuesAndCompare
();
}
catch
(
Exception
e
)
{
log
.
error
(
"ScheduledTask fetchMessageQueuesAndCompare exception"
,
e
);
}
}
},
1000
*
10
,
this
.
getDefaultLitePullConsumer
().
getTopicMetadataCheckIntervalMillis
(),
TimeUnit
.
MILLISECONDS
);
}
private
void
operateAfterRunning
()
throws
MQClientException
{
// If subscribe function invoke before start function, then update topic subscribe info after initialization.
if
(
subscriptionType
==
SubscriptionType
.
SUBSCRIBE
)
{
updateTopicSubscribeInfoWhenSubscriptionChanged
();
}
// If assign function invoke before start function, then update pull task after initialization.
if
(
subscriptionType
==
SubscriptionType
.
ASSIGN
)
{
updateAssignPullTask
(
assignedMessageQueue
.
messageQueues
());
}
for
(
String
topic
:
topicMessageQueueChangeListenerMap
.
keySet
())
{
Set
<
MessageQueue
>
messageQueues
=
fetchMessageQueues
(
topic
);
messageQueuesForTopic
.
put
(
topic
,
messageQueues
);
}
this
.
mQClientFactory
.
checkClientInBroker
();
}
private
void
checkConfig
()
throws
MQClientException
{
// Check consumerGroup
Validators
.
checkGroup
(
this
.
defaultLitePullConsumer
.
getConsumerGroup
());
// Check consumerGroup name is not equal default consumer group name.
if
(
this
.
defaultLitePullConsumer
.
getConsumerGroup
().
equals
(
MixAll
.
DEFAULT_CONSUMER_GROUP
))
{
throw
new
MQClientException
(
"consumerGroup can not equal "
+
MixAll
.
DEFAULT_CONSUMER_GROUP
+
", please specify another one."
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
CLIENT_PARAMETER_CHECK_URL
),
null
);
}
// Check messageModel is not null.
if
(
null
==
this
.
defaultLitePullConsumer
.
getMessageModel
())
{
throw
new
MQClientException
(
"messageModel is null"
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
CLIENT_PARAMETER_CHECK_URL
),
null
);
}
// Check allocateMessageQueueStrategy is not null
if
(
null
==
this
.
defaultLitePullConsumer
.
getAllocateMessageQueueStrategy
())
{
throw
new
MQClientException
(
"allocateMessageQueueStrategy is null"
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
CLIENT_PARAMETER_CHECK_URL
),
null
);
}
if
(
this
.
defaultLitePullConsumer
.
getConsumerTimeoutMillisWhenSuspend
()
<
this
.
defaultLitePullConsumer
.
getBrokerSuspendMaxTimeMillis
())
{
throw
new
MQClientException
(
"Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
CLIENT_PARAMETER_CHECK_URL
),
null
);
}
}
public
PullAPIWrapper
getPullAPIWrapper
()
{
return
pullAPIWrapper
;
}
private
void
startPullTask
(
Collection
<
MessageQueue
>
mqSet
)
{
for
(
MessageQueue
messageQueue
:
mqSet
)
{
if
(!
this
.
taskTable
.
containsKey
(
messageQueue
))
{
PullTaskImpl
pullTask
=
new
PullTaskImpl
(
messageQueue
);
this
.
taskTable
.
put
(
messageQueue
,
pullTask
);
this
.
scheduledThreadPoolExecutor
.
schedule
(
pullTask
,
0
,
TimeUnit
.
MILLISECONDS
);
}
}
}
private
void
updateAssignPullTask
(
Collection
<
MessageQueue
>
mqNewSet
)
{
Iterator
<
Map
.
Entry
<
MessageQueue
,
PullTaskImpl
>>
it
=
this
.
taskTable
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
Map
.
Entry
<
MessageQueue
,
PullTaskImpl
>
next
=
it
.
next
();
if
(!
mqNewSet
.
contains
(
next
.
getKey
()))
{
next
.
getValue
().
setCancelled
(
true
);
it
.
remove
();
}
}
startPullTask
(
mqNewSet
);
}
private
void
updateTopicSubscribeInfoWhenSubscriptionChanged
()
{
Map
<
String
,
SubscriptionData
>
subTable
=
rebalanceImpl
.
getSubscriptionInner
();
if
(
subTable
!=
null
)
{
for
(
final
Map
.
Entry
<
String
,
SubscriptionData
>
entry
:
subTable
.
entrySet
())
{
final
String
topic
=
entry
.
getKey
();
this
.
mQClientFactory
.
updateTopicRouteInfoFromNameServer
(
topic
);
}
}
}
public
synchronized
void
subscribe
(
String
topic
,
String
subExpression
)
throws
MQClientException
{
try
{
if
(
topic
==
null
||
topic
.
equals
(
""
))
{
throw
new
IllegalArgumentException
(
"Topic can not be null or empty."
);
}
setSubscriptionType
(
SubscriptionType
.
SUBSCRIBE
);
SubscriptionData
subscriptionData
=
FilterAPI
.
buildSubscriptionData
(
defaultLitePullConsumer
.
getConsumerGroup
(),
topic
,
subExpression
);
this
.
rebalanceImpl
.
getSubscriptionInner
().
put
(
topic
,
subscriptionData
);
this
.
defaultLitePullConsumer
.
setMessageQueueListener
(
new
MessageQueueListenerImpl
());
assignedMessageQueue
.
setRebalanceImpl
(
this
.
rebalanceImpl
);
if
(
serviceState
==
ServiceState
.
RUNNING
)
{
this
.
mQClientFactory
.
sendHeartbeatToAllBrokerWithLock
();
updateTopicSubscribeInfoWhenSubscriptionChanged
();
}
}
catch
(
Exception
e
)
{
throw
new
MQClientException
(
"subscribe exception"
,
e
);
}
}
public
synchronized
void
subscribe
(
String
topic
,
MessageSelector
messageSelector
)
throws
MQClientException
{
try
{
if
(
topic
==
null
||
topic
.
equals
(
""
))
{
throw
new
IllegalArgumentException
(
"Topic can not be null or empty."
);
}
setSubscriptionType
(
SubscriptionType
.
SUBSCRIBE
);
if
(
messageSelector
==
null
)
{
subscribe
(
topic
,
SubscriptionData
.
SUB_ALL
);
return
;
}
SubscriptionData
subscriptionData
=
FilterAPI
.
build
(
topic
,
messageSelector
.
getExpression
(),
messageSelector
.
getExpressionType
());
this
.
rebalanceImpl
.
getSubscriptionInner
().
put
(
topic
,
subscriptionData
);
this
.
defaultLitePullConsumer
.
setMessageQueueListener
(
new
MessageQueueListenerImpl
());
assignedMessageQueue
.
setRebalanceImpl
(
this
.
rebalanceImpl
);
if
(
serviceState
==
ServiceState
.
RUNNING
)
{
this
.
mQClientFactory
.
sendHeartbeatToAllBrokerWithLock
();
updateTopicSubscribeInfoWhenSubscriptionChanged
();
}
}
catch
(
Exception
e
)
{
throw
new
MQClientException
(
"subscribe exception"
,
e
);
}
}
public
synchronized
void
unsubscribe
(
final
String
topic
)
{
this
.
rebalanceImpl
.
getSubscriptionInner
().
remove
(
topic
);
removePullTaskCallback
(
topic
);
assignedMessageQueue
.
removeAssignedMessageQueue
(
topic
);
}
public
synchronized
void
assign
(
Collection
<
MessageQueue
>
messageQueues
)
{
if
(
messageQueues
==
null
||
messageQueues
.
isEmpty
())
{
throw
new
IllegalArgumentException
(
"Message queues can not be null or empty."
);
}
setSubscriptionType
(
SubscriptionType
.
ASSIGN
);
assignedMessageQueue
.
updateAssignedMessageQueue
(
messageQueues
);
if
(
serviceState
==
ServiceState
.
RUNNING
)
{
updateAssignPullTask
(
messageQueues
);
}
}
private
void
maybeAutoCommit
()
{
long
now
=
System
.
currentTimeMillis
();
if
(
now
>=
nextAutoCommitDeadline
)
{
commitAll
();
nextAutoCommitDeadline
=
now
+
defaultLitePullConsumer
.
getAutoCommitIntervalMillis
();
}
}
public
synchronized
List
<
MessageExt
>
poll
(
long
timeout
)
{
try
{
checkServiceState
();
if
(
timeout
<
0
)
throw
new
IllegalArgumentException
(
"Timeout must not be negative"
);
if
(
defaultLitePullConsumer
.
isAutoCommit
())
{
maybeAutoCommit
();
}
long
endTime
=
System
.
currentTimeMillis
()
+
timeout
;
ConsumeRequest
consumeRequest
=
consumeRequestCache
.
poll
(
endTime
-
System
.
currentTimeMillis
(),
TimeUnit
.
MILLISECONDS
);
if
(
endTime
-
System
.
currentTimeMillis
()
>
0
)
{
while
(
consumeRequest
!=
null
&&
consumeRequest
.
getProcessQueue
().
isDropped
())
{
consumeRequest
=
consumeRequestCache
.
poll
(
endTime
-
System
.
currentTimeMillis
(),
TimeUnit
.
MILLISECONDS
);
if
(
endTime
-
System
.
currentTimeMillis
()
<=
0
)
break
;
}
}
if
(
consumeRequest
!=
null
&&
!
consumeRequest
.
getProcessQueue
().
isDropped
())
{
List
<
MessageExt
>
messages
=
consumeRequest
.
getMessageExts
();
long
offset
=
consumeRequest
.
getProcessQueue
().
removeMessage
(
messages
);
assignedMessageQueue
.
updateConsumeOffset
(
consumeRequest
.
getMessageQueue
(),
offset
);
//If namespace not null , reset Topic without namespace.
this
.
resetTopic
(
messages
);
return
messages
;
}
}
catch
(
InterruptedException
ignore
)
{
}
return
Collections
.
emptyList
();
}
public
void
pause
(
Collection
<
MessageQueue
>
messageQueues
)
{
assignedMessageQueue
.
pause
(
messageQueues
);
}
public
void
resume
(
Collection
<
MessageQueue
>
messageQueues
)
{
assignedMessageQueue
.
resume
(
messageQueues
);
}
public
synchronized
void
seek
(
MessageQueue
messageQueue
,
long
offset
)
throws
MQClientException
{
if
(!
assignedMessageQueue
.
messageQueues
().
contains
(
messageQueue
))
{
if
(
subscriptionType
==
SubscriptionType
.
SUBSCRIBE
)
{
throw
new
MQClientException
(
"The message queue is not in assigned list, may be rebalancing, message queue: "
+
messageQueue
,
null
);
}
else
{
throw
new
MQClientException
(
"The message queue is not in assigned list, message queue: "
+
messageQueue
,
null
);
}
}
long
minOffset
=
minOffset
(
messageQueue
);
long
maxOffset
=
maxOffset
(
messageQueue
);
if
(
offset
<
minOffset
||
offset
>
maxOffset
)
{
throw
new
MQClientException
(
"Seek offset illegal, seek offset = "
+
offset
+
", min offset = "
+
minOffset
+
", max offset = "
+
maxOffset
,
null
);
}
final
Object
objLock
=
messageQueueLock
.
fetchLockObject
(
messageQueue
);
synchronized
(
objLock
)
{
assignedMessageQueue
.
setSeekOffset
(
messageQueue
,
offset
);
clearMessageQueueInCache
(
messageQueue
);
}
}
private
long
maxOffset
(
MessageQueue
messageQueue
)
throws
MQClientException
{
checkServiceState
();
return
this
.
mQClientFactory
.
getMQAdminImpl
().
maxOffset
(
messageQueue
);
}
private
long
minOffset
(
MessageQueue
messageQueue
)
throws
MQClientException
{
checkServiceState
();
return
this
.
mQClientFactory
.
getMQAdminImpl
().
minOffset
(
messageQueue
);
}
private
void
removePullTaskCallback
(
final
String
topic
)
{
removePullTask
(
topic
);
}
private
void
removePullTask
(
final
String
topic
)
{
Iterator
<
Map
.
Entry
<
MessageQueue
,
PullTaskImpl
>>
it
=
this
.
taskTable
.
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
Map
.
Entry
<
MessageQueue
,
PullTaskImpl
>
next
=
it
.
next
();
if
(
next
.
getKey
().
getTopic
().
equals
(
topic
))
{
next
.
getValue
().
setCancelled
(
true
);
it
.
remove
();
}
}
}
public
synchronized
void
commitSync
()
{
try
{
for
(
MessageQueue
messageQueue
:
assignedMessageQueue
.
messageQueues
())
{
long
consumerOffset
=
assignedMessageQueue
.
getConusmerOffset
(
messageQueue
);
if
(
consumerOffset
!=
-
1
)
{
ProcessQueue
processQueue
=
assignedMessageQueue
.
getProcessQueue
(
messageQueue
);
long
preConsumerOffset
=
this
.
getOffsetStore
().
readOffset
(
messageQueue
,
ReadOffsetType
.
READ_FROM_MEMORY
);
if
(
processQueue
!=
null
&&
!
processQueue
.
isDropped
()
&&
consumerOffset
!=
preConsumerOffset
)
{
updateConsumeOffset
(
messageQueue
,
consumerOffset
);
updateConsumeOffsetToBroker
(
messageQueue
,
consumerOffset
,
false
);
}
}
}
if
(
defaultLitePullConsumer
.
getMessageModel
()
==
MessageModel
.
BROADCASTING
)
{
offsetStore
.
persistAll
(
assignedMessageQueue
.
messageQueues
());
}
}
catch
(
Exception
e
)
{
log
.
error
(
"An error occurred when update consume offset synchronously."
,
e
);
}
}
private
synchronized
void
commitAll
()
{
try
{
for
(
MessageQueue
messageQueue
:
assignedMessageQueue
.
messageQueues
())
{
long
consumerOffset
=
assignedMessageQueue
.
getConusmerOffset
(
messageQueue
);
if
(
consumerOffset
!=
-
1
)
{
ProcessQueue
processQueue
=
assignedMessageQueue
.
getProcessQueue
(
messageQueue
);
long
preConsumerOffset
=
this
.
getOffsetStore
().
readOffset
(
messageQueue
,
ReadOffsetType
.
READ_FROM_MEMORY
);
if
(
processQueue
!=
null
&&
!
processQueue
.
isDropped
()
&&
consumerOffset
!=
preConsumerOffset
)
{
updateConsumeOffset
(
messageQueue
,
consumerOffset
);
updateConsumeOffsetToBroker
(
messageQueue
,
consumerOffset
,
true
);
}
}
}
if
(
defaultLitePullConsumer
.
getMessageModel
()
==
MessageModel
.
BROADCASTING
)
{
offsetStore
.
persistAll
(
assignedMessageQueue
.
messageQueues
());
}
}
catch
(
Exception
e
)
{
log
.
error
(
"An error occurred when update consume offset Automatically."
);
}
}
private
void
updatePullOffset
(
MessageQueue
messageQueue
,
long
nextPullOffset
)
{
if
(
assignedMessageQueue
.
getSeekOffset
(
messageQueue
)
==
-
1
)
{
assignedMessageQueue
.
updatePullOffset
(
messageQueue
,
nextPullOffset
);
}
}
private
void
submitConsumeRequest
(
ConsumeRequest
consumeRequest
)
{
try
{
consumeRequestCache
.
put
(
consumeRequest
);
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"Submit consumeRequest error"
,
e
);
}
}
private
long
fetchConsumeOffset
(
MessageQueue
messageQueue
,
boolean
fromStore
)
{
checkServiceState
();
return
this
.
offsetStore
.
readOffset
(
messageQueue
,
fromStore
?
ReadOffsetType
.
READ_FROM_STORE
:
ReadOffsetType
.
MEMORY_FIRST_THEN_STORE
);
}
public
long
committed
(
MessageQueue
messageQueue
)
throws
MQClientException
{
checkServiceState
();
long
offset
=
this
.
offsetStore
.
readOffset
(
messageQueue
,
ReadOffsetType
.
READ_FROM_STORE
);
if
(
offset
==
-
2
)
throw
new
MQClientException
(
"Fetch consume offset from broker exception"
,
null
);
return
offset
;
}
private
void
clearMessageQueueInCache
(
MessageQueue
messageQueue
)
{
ProcessQueue
processQueue
=
assignedMessageQueue
.
getProcessQueue
(
messageQueue
);
if
(
processQueue
!=
null
)
{
processQueue
.
clear
();
}
Iterator
<
ConsumeRequest
>
iter
=
consumeRequestCache
.
iterator
();
while
(
iter
.
hasNext
())
{
if
(
iter
.
next
().
getMessageQueue
().
equals
(
messageQueue
))
iter
.
remove
();
}
}
private
long
nextPullOffset
(
MessageQueue
messageQueue
)
{
long
offset
=
-
1
;
long
seekOffset
=
assignedMessageQueue
.
getSeekOffset
(
messageQueue
);
if
(
seekOffset
!=
-
1
)
{
offset
=
seekOffset
;
assignedMessageQueue
.
updateConsumeOffset
(
messageQueue
,
offset
);
assignedMessageQueue
.
setSeekOffset
(
messageQueue
,
-
1
);
}
else
{
offset
=
assignedMessageQueue
.
getPullOffset
(
messageQueue
);
if
(
offset
==
-
1
)
{
offset
=
fetchConsumeOffset
(
messageQueue
,
false
);
if
(
offset
==
-
1
&&
defaultLitePullConsumer
.
getMessageModel
()
==
MessageModel
.
BROADCASTING
)
{
offset
=
0
;
}
}
assignedMessageQueue
.
updateConsumeOffset
(
messageQueue
,
offset
);
}
return
offset
;
}
public
long
searchOffset
(
MessageQueue
mq
,
long
timestamp
)
throws
MQClientException
{
checkServiceState
();
return
this
.
mQClientFactory
.
getMQAdminImpl
().
searchOffset
(
mq
,
timestamp
);
}
public
class
PullTaskImpl
implements
Runnable
{
private
final
MessageQueue
messageQueue
;
private
volatile
boolean
cancelled
=
false
;
public
PullTaskImpl
(
final
MessageQueue
messageQueue
)
{
this
.
messageQueue
=
messageQueue
;
}
@Override
public
void
run
()
{
if
(!
this
.
isCancelled
())
{
if
(
assignedMessageQueue
.
isPaused
(
messageQueue
))
{
scheduledThreadPoolExecutor
.
schedule
(
this
,
PULL_TIME_DELAY_MILLS_WHEN_PAUSE
,
TimeUnit
.
MILLISECONDS
);
log
.
debug
(
"Message Queue: {} has been paused!"
,
messageQueue
);
return
;
}
ProcessQueue
processQueue
=
assignedMessageQueue
.
getProcessQueue
(
messageQueue
);
if
(
processQueue
==
null
&&
processQueue
.
isDropped
())
{
log
.
info
(
"The message queue not be able to poll, because it's dropped. group={}, messageQueue={}"
,
defaultLitePullConsumer
.
getConsumerGroup
(),
this
.
messageQueue
);
return
;
}
if
(
consumeRequestCache
.
size
()
*
defaultLitePullConsumer
.
getPullBatchSize
()
>
defaultLitePullConsumer
.
getPullThresholdForAll
())
{
scheduledThreadPoolExecutor
.
schedule
(
this
,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
,
TimeUnit
.
MILLISECONDS
);
if
((
consumeRequestFlowControlTimes
++
%
1000
)
==
0
)
log
.
warn
(
"The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}"
,
consumeRequestCache
.
size
(),
consumeRequestFlowControlTimes
);
return
;
}
long
cachedMessageCount
=
processQueue
.
getMsgCount
().
get
();
long
cachedMessageSizeInMiB
=
processQueue
.
getMsgSize
().
get
()
/
(
1024
*
1024
);
if
(
cachedMessageCount
>
defaultLitePullConsumer
.
getPullThresholdForQueue
())
{
scheduledThreadPoolExecutor
.
schedule
(
this
,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
,
TimeUnit
.
MILLISECONDS
);
if
((
queueFlowControlTimes
++
%
1000
)
==
0
)
{
log
.
warn
(
"The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}"
,
defaultLitePullConsumer
.
getPullThresholdForQueue
(),
processQueue
.
getMsgTreeMap
().
firstKey
(),
processQueue
.
getMsgTreeMap
().
lastKey
(),
cachedMessageCount
,
cachedMessageSizeInMiB
,
queueFlowControlTimes
);
}
return
;
}
if
(
cachedMessageSizeInMiB
>
defaultLitePullConsumer
.
getPullThresholdSizeForQueue
())
{
scheduledThreadPoolExecutor
.
schedule
(
this
,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
,
TimeUnit
.
MILLISECONDS
);
if
((
queueFlowControlTimes
++
%
1000
)
==
0
)
{
log
.
warn
(
"The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}"
,
defaultLitePullConsumer
.
getPullThresholdSizeForQueue
(),
processQueue
.
getMsgTreeMap
().
firstKey
(),
processQueue
.
getMsgTreeMap
().
lastKey
(),
cachedMessageCount
,
cachedMessageSizeInMiB
,
queueFlowControlTimes
);
}
return
;
}
if
(
processQueue
.
getMaxSpan
()
>
defaultLitePullConsumer
.
getConsumeMaxSpan
())
{
scheduledThreadPoolExecutor
.
schedule
(
this
,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
,
TimeUnit
.
MILLISECONDS
);
if
((
queueMaxSpanFlowControlTimes
++
%
1000
)
==
0
)
{
log
.
warn
(
"The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}"
,
processQueue
.
getMsgTreeMap
().
firstKey
(),
processQueue
.
getMsgTreeMap
().
lastKey
(),
processQueue
.
getMaxSpan
(),
queueMaxSpanFlowControlTimes
);
}
return
;
}
long
offset
=
nextPullOffset
(
messageQueue
);
long
pullDelayTimeMills
=
0
;
try
{
SubscriptionData
subscriptionData
;
if
(
subscriptionType
==
SubscriptionType
.
SUBSCRIBE
)
{
String
topic
=
this
.
messageQueue
.
getTopic
();
subscriptionData
=
rebalanceImpl
.
getSubscriptionInner
().
get
(
topic
);
}
else
{
String
topic
=
this
.
messageQueue
.
getTopic
();
subscriptionData
=
FilterAPI
.
buildSubscriptionData
(
defaultLitePullConsumer
.
getConsumerGroup
(),
topic
,
SubscriptionData
.
SUB_ALL
);
}
PullResult
pullResult
=
pull
(
messageQueue
,
subscriptionData
,
offset
,
nextPullBatchSize
());
switch
(
pullResult
.
getPullStatus
())
{
case
FOUND:
final
Object
objLock
=
messageQueueLock
.
fetchLockObject
(
messageQueue
);
synchronized
(
objLock
)
{
if
(
pullResult
.
getMsgFoundList
()
!=
null
&&
!
pullResult
.
getMsgFoundList
().
isEmpty
()
&&
assignedMessageQueue
.
getSeekOffset
(
messageQueue
)
==
-
1
)
{
processQueue
.
putMessage
(
pullResult
.
getMsgFoundList
());
submitConsumeRequest
(
new
ConsumeRequest
(
pullResult
.
getMsgFoundList
(),
messageQueue
,
processQueue
));
}
}
break
;
case
OFFSET_ILLEGAL:
log
.
warn
(
"The pull request offset illegal, {}"
,
pullResult
.
toString
());
break
;
default
:
break
;
}
updatePullOffset
(
messageQueue
,
pullResult
.
getNextBeginOffset
());
}
catch
(
Throwable
e
)
{
pullDelayTimeMills
=
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION
;
log
.
error
(
"An error occurred in pull message process."
,
e
);
}
if
(!
this
.
isCancelled
())
{
scheduledThreadPoolExecutor
.
schedule
(
this
,
pullDelayTimeMills
,
TimeUnit
.
MILLISECONDS
);
}
else
{
log
.
warn
(
"The Pull Task is cancelled after doPullTask, {}"
,
messageQueue
);
}
}
}
public
boolean
isCancelled
()
{
return
cancelled
;
}
public
void
setCancelled
(
boolean
cancelled
)
{
this
.
cancelled
=
cancelled
;
}
public
MessageQueue
getMessageQueue
()
{
return
messageQueue
;
}
}
private
PullResult
pull
(
MessageQueue
mq
,
SubscriptionData
subscriptionData
,
long
offset
,
int
maxNums
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
pull
(
mq
,
subscriptionData
,
offset
,
maxNums
,
this
.
defaultLitePullConsumer
.
getConsumerPullTimeoutMillis
());
}
private
PullResult
pull
(
MessageQueue
mq
,
SubscriptionData
subscriptionData
,
long
offset
,
int
maxNums
,
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
this
.
pullSyncImpl
(
mq
,
subscriptionData
,
offset
,
maxNums
,
true
,
timeout
);
}
private
PullResult
pullSyncImpl
(
MessageQueue
mq
,
SubscriptionData
subscriptionData
,
long
offset
,
int
maxNums
,
boolean
block
,
long
timeout
)
throws
MQClientException
,
RemotingException
,
MQBrokerException
,
InterruptedException
{
if
(
null
==
mq
)
{
throw
new
MQClientException
(
"mq is null"
,
null
);
}
if
(
offset
<
0
)
{
throw
new
MQClientException
(
"offset < 0"
,
null
);
}
if
(
maxNums
<=
0
)
{
throw
new
MQClientException
(
"maxNums <= 0"
,
null
);
}
int
sysFlag
=
PullSysFlag
.
buildSysFlag
(
false
,
block
,
true
,
false
);
long
timeoutMillis
=
block
?
this
.
defaultLitePullConsumer
.
getConsumerTimeoutMillisWhenSuspend
()
:
timeout
;
boolean
isTagType
=
ExpressionType
.
isTagType
(
subscriptionData
.
getExpressionType
());
PullResult
pullResult
=
this
.
pullAPIWrapper
.
pullKernelImpl
(
mq
,
subscriptionData
.
getSubString
(),
subscriptionData
.
getExpressionType
(),
isTagType
?
0L
:
subscriptionData
.
getSubVersion
(),
offset
,
maxNums
,
sysFlag
,
0
,
this
.
defaultLitePullConsumer
.
getBrokerSuspendMaxTimeMillis
(),
timeoutMillis
,
CommunicationMode
.
SYNC
,
null
);
this
.
pullAPIWrapper
.
processPullResult
(
mq
,
pullResult
,
subscriptionData
);
return
pullResult
;
}
private
void
resetTopic
(
List
<
MessageExt
>
msgList
)
{
if
(
null
==
msgList
||
msgList
.
size
()
==
0
)
{
return
;
}
//If namespace not null , reset Topic without namespace.
for
(
MessageExt
messageExt
:
msgList
)
{
if
(
null
!=
this
.
defaultLitePullConsumer
.
getNamespace
())
{
messageExt
.
setTopic
(
NamespaceUtil
.
withoutNamespace
(
messageExt
.
getTopic
(),
this
.
defaultLitePullConsumer
.
getNamespace
()));
}
}
}
public
void
updateConsumeOffset
(
MessageQueue
mq
,
long
offset
)
{
checkServiceState
();
this
.
offsetStore
.
updateOffset
(
mq
,
offset
,
false
);
}
@Override
public
String
groupName
()
{
return
this
.
defaultLitePullConsumer
.
getConsumerGroup
();
}
@Override
public
MessageModel
messageModel
()
{
return
this
.
defaultLitePullConsumer
.
getMessageModel
();
}
@Override
public
ConsumeType
consumeType
()
{
return
ConsumeType
.
CONSUME_ACTIVELY
;
}
@Override
public
ConsumeFromWhere
consumeFromWhere
()
{
return
ConsumeFromWhere
.
CONSUME_FROM_LAST_OFFSET
;
}
@Override
public
Set
<
SubscriptionData
>
subscriptions
()
{
Set
<
SubscriptionData
>
subSet
=
new
HashSet
<
SubscriptionData
>();
subSet
.
addAll
(
this
.
rebalanceImpl
.
getSubscriptionInner
().
values
());
return
subSet
;
}
@Override
public
void
doRebalance
()
{
if
(
this
.
rebalanceImpl
!=
null
)
{
this
.
rebalanceImpl
.
doRebalance
(
false
);
}
}
@Override
public
void
persistConsumerOffset
()
{
try
{
checkServiceState
();
Set
<
MessageQueue
>
mqs
=
new
HashSet
<
MessageQueue
>();
Set
<
MessageQueue
>
allocateMq
=
this
.
rebalanceImpl
.
getProcessQueueTable
().
keySet
();
mqs
.
addAll
(
allocateMq
);
this
.
offsetStore
.
persistAll
(
mqs
);
}
catch
(
Exception
e
)
{
log
.
error
(
"group: "
+
this
.
defaultLitePullConsumer
.
getConsumerGroup
()
+
" persistConsumerOffset exception"
,
e
);
}
}
@Override
public
void
updateTopicSubscribeInfo
(
String
topic
,
Set
<
MessageQueue
>
info
)
{
Map
<
String
,
SubscriptionData
>
subTable
=
this
.
rebalanceImpl
.
getSubscriptionInner
();
if
(
subTable
!=
null
)
{
if
(
subTable
.
containsKey
(
topic
))
{
this
.
rebalanceImpl
.
getTopicSubscribeInfoTable
().
put
(
topic
,
info
);
}
}
}
@Override
public
boolean
isSubscribeTopicNeedUpdate
(
String
topic
)
{
Map
<
String
,
SubscriptionData
>
subTable
=
this
.
rebalanceImpl
.
getSubscriptionInner
();
if
(
subTable
!=
null
)
{
if
(
subTable
.
containsKey
(
topic
))
{
return
!
this
.
rebalanceImpl
.
topicSubscribeInfoTable
.
containsKey
(
topic
);
}
}
return
false
;
}
@Override
public
boolean
isUnitMode
()
{
return
this
.
defaultLitePullConsumer
.
isUnitMode
();
}
@Override
public
ConsumerRunningInfo
consumerRunningInfo
()
{
ConsumerRunningInfo
info
=
new
ConsumerRunningInfo
();
Properties
prop
=
MixAll
.
object2Properties
(
this
.
defaultLitePullConsumer
);
prop
.
put
(
ConsumerRunningInfo
.
PROP_CONSUMER_START_TIMESTAMP
,
String
.
valueOf
(
this
.
consumerStartTimestamp
));
info
.
setProperties
(
prop
);
info
.
getSubscriptionSet
().
addAll
(
this
.
subscriptions
());
return
info
;
}
private
void
updateConsumeOffsetToBroker
(
MessageQueue
mq
,
long
offset
,
boolean
isOneway
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
this
.
offsetStore
.
updateConsumeOffsetToBroker
(
mq
,
offset
,
isOneway
);
}
public
OffsetStore
getOffsetStore
()
{
return
offsetStore
;
}
public
DefaultLitePullConsumer
getDefaultLitePullConsumer
()
{
return
defaultLitePullConsumer
;
}
public
Set
<
MessageQueue
>
fetchMessageQueues
(
String
topic
)
throws
MQClientException
{
checkServiceState
();
Set
<
MessageQueue
>
result
=
this
.
mQClientFactory
.
getMQAdminImpl
().
fetchSubscribeMessageQueues
(
topic
);
return
parseMessageQueues
(
result
);
}
private
synchronized
void
fetchTopicMessageQueuesAndCompare
()
throws
MQClientException
{
for
(
Map
.
Entry
<
String
,
TopicMessageQueueChangeListener
>
entry
:
topicMessageQueueChangeListenerMap
.
entrySet
())
{
String
topic
=
entry
.
getKey
();
TopicMessageQueueChangeListener
topicMessageQueueChangeListener
=
entry
.
getValue
();
Set
<
MessageQueue
>
oldMessageQueues
=
messageQueuesForTopic
.
get
(
topic
);
Set
<
MessageQueue
>
newMessageQueues
=
fetchMessageQueues
(
topic
);
boolean
isChanged
=
!
isSetEqual
(
newMessageQueues
,
oldMessageQueues
);
if
(
isChanged
)
{
messageQueuesForTopic
.
put
(
topic
,
newMessageQueues
);
if
(
topicMessageQueueChangeListener
!=
null
)
{
topicMessageQueueChangeListener
.
onChanged
(
topic
,
newMessageQueues
);
}
}
}
}
private
boolean
isSetEqual
(
Set
<
MessageQueue
>
set1
,
Set
<
MessageQueue
>
set2
)
{
if
(
set1
==
null
&&
set2
==
null
)
{
return
true
;
}
if
(
set1
==
null
||
set2
==
null
||
set1
.
size
()
!=
set2
.
size
()
||
set1
.
size
()
==
0
||
set2
.
size
()
==
0
)
{
return
false
;
}
Iterator
iter
=
set2
.
iterator
();
boolean
isEqual
=
true
;
while
(
iter
.
hasNext
())
{
if
(!
set1
.
contains
(
iter
.
next
()))
{
isEqual
=
false
;
}
}
return
isEqual
;
}
public
synchronized
void
registerTopicMessageQueueChangeListener
(
String
topic
,
TopicMessageQueueChangeListener
listener
)
throws
MQClientException
{
if
(
topic
==
null
||
listener
==
null
)
{
throw
new
MQClientException
(
"Topic or listener is null"
,
null
);
}
if
(
topicMessageQueueChangeListenerMap
.
containsKey
(
topic
))
{
log
.
warn
(
"Topic {} had been registered, new listener will overwrite the old one"
,
topic
);
}
topicMessageQueueChangeListenerMap
.
put
(
topic
,
listener
);
if
(
this
.
serviceState
==
ServiceState
.
RUNNING
)
{
Set
<
MessageQueue
>
messageQueues
=
fetchMessageQueues
(
topic
);
messageQueuesForTopic
.
put
(
topic
,
messageQueues
);
}
}
private
Set
<
MessageQueue
>
parseMessageQueues
(
Set
<
MessageQueue
>
queueSet
)
{
Set
<
MessageQueue
>
resultQueues
=
new
HashSet
<
MessageQueue
>();
for
(
MessageQueue
messageQueue
:
queueSet
)
{
String
userTopic
=
NamespaceUtil
.
withoutNamespace
(
messageQueue
.
getTopic
(),
this
.
defaultLitePullConsumer
.
getNamespace
());
resultQueues
.
add
(
new
MessageQueue
(
userTopic
,
messageQueue
.
getBrokerName
(),
messageQueue
.
getQueueId
()));
}
return
resultQueues
;
}
public
class
ConsumeRequest
{
private
final
List
<
MessageExt
>
messageExts
;
private
final
MessageQueue
messageQueue
;
private
final
ProcessQueue
processQueue
;
public
ConsumeRequest
(
final
List
<
MessageExt
>
messageExts
,
final
MessageQueue
messageQueue
,
final
ProcessQueue
processQueue
)
{
this
.
messageExts
=
messageExts
;
this
.
messageQueue
=
messageQueue
;
this
.
processQueue
=
processQueue
;
}
public
List
<
MessageExt
>
getMessageExts
()
{
return
messageExts
;
}
public
MessageQueue
getMessageQueue
()
{
return
messageQueue
;
}
public
ProcessQueue
getProcessQueue
()
{
return
processQueue
;
}
}
}
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
浏览文件 @
9edeb4ec
...
...
@@ -66,6 +66,11 @@ import org.apache.rocketmq.remoting.RPCHook;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
/**
* This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumerImpl} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public
class
DefaultMQPullConsumerImpl
implements
MQConsumerInner
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
private
final
DefaultMQPullConsumer
defaultMQPullConsumer
;
...
...
@@ -74,7 +79,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private
final
ArrayList
<
ConsumeMessageHook
>
consumeMessageHookList
=
new
ArrayList
<
ConsumeMessageHook
>();
private
final
ArrayList
<
FilterMessageHook
>
filterMessageHookList
=
new
ArrayList
<
FilterMessageHook
>();
private
volatile
ServiceState
serviceState
=
ServiceState
.
CREATE_JUST
;
pr
ivate
MQClientInstance
mQClientFactory
;
pr
otected
MQClientInstance
mQClientFactory
;
private
PullAPIWrapper
pullAPIWrapper
;
private
OffsetStore
offsetStore
;
private
RebalanceImpl
rebalanceImpl
=
new
RebalancePullImpl
(
this
);
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
浏览文件 @
9edeb4ec
...
...
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
import
java.util.concurrent.locks.ReadWriteLock
;
import
java.util.concurrent.locks.ReentrantLock
;
import
java.util.concurrent.locks.ReentrantReadWriteLock
;
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.logging.InternalLogger
;
...
...
@@ -431,4 +432,5 @@ public class ProcessQueue {
public
void
setLastConsumeTimestamp
(
long
lastConsumeTimestamp
)
{
this
.
lastConsumeTimestamp
=
lastConsumeTimestamp
;
}
}
client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
浏览文件 @
9edeb4ec
...
...
@@ -41,8 +41,10 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData
;
/**
* Base class for rebalance algorithm
* This class will be removed in 2022, and a better implementation {@link RebalanceLitePullImpl} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public
abstract
class
RebalanceImpl
{
protected
static
final
InternalLogger
log
=
ClientLogger
.
getLog
();
protected
final
ConcurrentMap
<
MessageQueue
,
ProcessQueue
>
processQueueTable
=
new
ConcurrentHashMap
<
MessageQueue
,
ProcessQueue
>(
64
);
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.impl.consumer
;
import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy
;
import
org.apache.rocketmq.client.consumer.MessageQueueListener
;
import
org.apache.rocketmq.client.impl.factory.MQClientInstance
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumeType
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
java.util.List
;
import
java.util.Set
;
public
class
RebalanceLitePullImpl
extends
RebalanceImpl
{
private
final
DefaultLitePullConsumerImpl
litePullConsumerImpl
;
public
RebalanceLitePullImpl
(
DefaultLitePullConsumerImpl
litePullConsumerImpl
)
{
this
(
null
,
null
,
null
,
null
,
litePullConsumerImpl
);
}
public
RebalanceLitePullImpl
(
String
consumerGroup
,
MessageModel
messageModel
,
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
,
MQClientInstance
mQClientFactory
,
DefaultLitePullConsumerImpl
litePullConsumerImpl
)
{
super
(
consumerGroup
,
messageModel
,
allocateMessageQueueStrategy
,
mQClientFactory
);
this
.
litePullConsumerImpl
=
litePullConsumerImpl
;
}
@Override
public
void
messageQueueChanged
(
String
topic
,
Set
<
MessageQueue
>
mqAll
,
Set
<
MessageQueue
>
mqDivided
)
{
MessageQueueListener
messageQueueListener
=
this
.
litePullConsumerImpl
.
getDefaultLitePullConsumer
().
getMessageQueueListener
();
if
(
messageQueueListener
!=
null
)
{
try
{
messageQueueListener
.
messageQueueChanged
(
topic
,
mqAll
,
mqDivided
);
}
catch
(
Throwable
e
)
{
log
.
error
(
"messageQueueChanged exception"
,
e
);
}
}
}
@Override
public
boolean
removeUnnecessaryMessageQueue
(
MessageQueue
mq
,
ProcessQueue
pq
)
{
this
.
litePullConsumerImpl
.
getOffsetStore
().
persist
(
mq
);
this
.
litePullConsumerImpl
.
getOffsetStore
().
removeOffset
(
mq
);
return
true
;
}
@Override
public
ConsumeType
consumeType
()
{
return
ConsumeType
.
CONSUME_ACTIVELY
;
}
@Override
public
void
removeDirtyOffset
(
final
MessageQueue
mq
)
{
this
.
litePullConsumerImpl
.
getOffsetStore
().
removeOffset
(
mq
);
}
@Override
public
long
computePullFromWhere
(
MessageQueue
mq
)
{
return
0
;
}
@Override
public
void
dispatchPullRequest
(
List
<
PullRequest
>
pullRequestList
)
{
}
}
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
浏览文件 @
9edeb4ec
...
...
@@ -246,10 +246,6 @@ public class MQClientInstance {
log
.
info
(
"the client factory [{}] start OK"
,
this
.
clientId
);
this
.
serviceState
=
ServiceState
.
RUNNING
;
break
;
case
RUNNING:
break
;
case
SHUTDOWN_ALREADY:
break
;
case
START_FAILED:
throw
new
MQClientException
(
"The Factory object["
+
this
.
getClientId
()
+
"] has been created before, and failed."
,
null
);
default
:
...
...
client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.client.consumer
;
import
java.io.ByteArrayOutputStream
;
import
java.lang.reflect.Field
;
import
java.net.InetSocketAddress
;
import
java.util.Collections
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Set
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.consumer.store.OffsetStore
;
import
org.apache.rocketmq.client.consumer.store.ReadOffsetType
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.impl.CommunicationMode
;
import
org.apache.rocketmq.client.impl.FindBrokerResult
;
import
org.apache.rocketmq.client.impl.MQAdminImpl
;
import
org.apache.rocketmq.client.impl.MQClientAPIImpl
;
import
org.apache.rocketmq.client.impl.MQClientManager
;
import
org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue
;
import
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl
;
import
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper
;
import
org.apache.rocketmq.client.impl.consumer.PullResultExt
;
import
org.apache.rocketmq.client.impl.consumer.RebalanceImpl
;
import
org.apache.rocketmq.client.impl.consumer.RebalanceService
;
import
org.apache.rocketmq.client.impl.factory.MQClientInstance
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.message.MessageClientExt
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.invocation.InvocationOnMock
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
org.mockito.stubbing.Answer
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
assertj
.
core
.
api
.
Fail
.
failBecauseExceptionWasNotThrown
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyBoolean
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyString
;
import
static
org
.
mockito
.
ArgumentMatchers
.
nullable
;
import
static
org
.
mockito
.
Mockito
.
doReturn
;
import
static
org
.
mockito
.
Mockito
.
spy
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
DefaultLitePullConsumerTest
{
@Spy
private
MQClientInstance
mQClientFactory
=
MQClientManager
.
getInstance
().
getAndCreateMQClientInstance
(
new
ClientConfig
());
@Mock
private
MQClientAPIImpl
mQClientAPIImpl
;
@Mock
private
MQAdminImpl
mQAdminImpl
;
private
RebalanceImpl
rebalanceImpl
;
private
OffsetStore
offsetStore
;
private
DefaultLitePullConsumerImpl
litePullConsumerImpl
;
private
String
consumerGroup
=
"LitePullConsumerGroup"
;
private
String
topic
=
"LitePullConsumerTest"
;
private
String
brokerName
=
"BrokerA"
;
private
boolean
flag
=
false
;
@Before
public
void
init
()
throws
Exception
{
Field
field
=
MQClientInstance
.
class
.
getDeclaredField
(
"rebalanceService"
);
field
.
setAccessible
(
true
);
RebalanceService
rebalanceService
=
(
RebalanceService
)
field
.
get
(
mQClientFactory
);
field
=
RebalanceService
.
class
.
getDeclaredField
(
"waitInterval"
);
field
.
setAccessible
(
true
);
field
.
set
(
rebalanceService
,
100
);
}
@Test
public
void
testAssign_PollMessageSuccess
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createStartLitePullConsumer
();
try
{
MessageQueue
messageQueue
=
createMessageQueue
();
litePullConsumer
.
assign
(
Collections
.
singletonList
(
messageQueue
));
List
<
MessageExt
>
result
=
litePullConsumer
.
poll
();
assertThat
(
result
.
get
(
0
).
getTopic
()).
isEqualTo
(
topic
);
assertThat
(
result
.
get
(
0
).
getBody
()).
isEqualTo
(
new
byte
[]
{
'a'
});
}
finally
{
litePullConsumer
.
shutdown
();
}
}
@Test
public
void
testSubscribe_PollMessageSuccess
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createSubscribeLitePullConsumer
();
try
{
Set
<
MessageQueue
>
messageQueueSet
=
new
HashSet
<
MessageQueue
>();
messageQueueSet
.
add
(
createMessageQueue
());
litePullConsumerImpl
.
updateTopicSubscribeInfo
(
topic
,
messageQueueSet
);
litePullConsumer
.
setPollTimeoutMillis
(
20
*
1000
);
List
<
MessageExt
>
result
=
litePullConsumer
.
poll
();
assertThat
(
result
.
get
(
0
).
getTopic
()).
isEqualTo
(
topic
);
assertThat
(
result
.
get
(
0
).
getBody
()).
isEqualTo
(
new
byte
[]
{
'a'
});
}
finally
{
litePullConsumer
.
shutdown
();
}
}
@Test
public
void
testSubscribe_BroadcastPollMessageSuccess
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createBroadcastLitePullConsumer
();
try
{
Set
<
MessageQueue
>
messageQueueSet
=
new
HashSet
<
MessageQueue
>();
messageQueueSet
.
add
(
createMessageQueue
());
litePullConsumerImpl
.
updateTopicSubscribeInfo
(
topic
,
messageQueueSet
);
litePullConsumer
.
setPollTimeoutMillis
(
20
*
1000
);
List
<
MessageExt
>
result
=
litePullConsumer
.
poll
();
assertThat
(
result
.
get
(
0
).
getTopic
()).
isEqualTo
(
topic
);
assertThat
(
result
.
get
(
0
).
getBody
()).
isEqualTo
(
new
byte
[]
{
'a'
});
}
finally
{
litePullConsumer
.
shutdown
();
}
}
@Test
public
void
testSubscriptionType_AssignAndSubscribeExclusive
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createStartLitePullConsumer
();
try
{
litePullConsumer
.
subscribe
(
topic
,
"*"
);
litePullConsumer
.
assign
(
Collections
.
singletonList
(
createMessageQueue
()));
failBecauseExceptionWasNotThrown
(
IllegalStateException
.
class
);
}
catch
(
IllegalStateException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"Subscribe and assign are mutually exclusive."
);
}
finally
{
litePullConsumer
.
shutdown
();
}
}
@Test
public
void
testFetchMessageQueues_FetchMessageQueuesBeforeStart
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createNotStartLitePullConsumer
();
try
{
litePullConsumer
.
fetchMessageQueues
(
topic
);
failBecauseExceptionWasNotThrown
(
IllegalStateException
.
class
);
}
catch
(
IllegalStateException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"The consumer not running, please start it first."
);
}
finally
{
litePullConsumer
.
shutdown
();
}
}
@Test
public
void
testSeek_SeekOffsetSuccess
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createStartLitePullConsumer
();
when
(
mQAdminImpl
.
minOffset
(
any
(
MessageQueue
.
class
))).
thenReturn
(
0L
);
when
(
mQAdminImpl
.
maxOffset
(
any
(
MessageQueue
.
class
))).
thenReturn
(
500L
);
MessageQueue
messageQueue
=
createMessageQueue
();
litePullConsumer
.
assign
(
Collections
.
singletonList
(
messageQueue
));
long
offset
=
litePullConsumer
.
committed
(
messageQueue
);
litePullConsumer
.
seek
(
messageQueue
,
offset
);
Field
field
=
DefaultLitePullConsumerImpl
.
class
.
getDeclaredField
(
"assignedMessageQueue"
);
field
.
setAccessible
(
true
);
AssignedMessageQueue
assignedMessageQueue
=
(
AssignedMessageQueue
)
field
.
get
(
litePullConsumerImpl
);
assertEquals
(
assignedMessageQueue
.
getSeekOffset
(
messageQueue
),
offset
);
litePullConsumer
.
shutdown
();
}
@Test
public
void
testSeek_SeekOffsetIllegal
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createStartLitePullConsumer
();
when
(
mQAdminImpl
.
minOffset
(
any
(
MessageQueue
.
class
))).
thenReturn
(
0L
);
when
(
mQAdminImpl
.
maxOffset
(
any
(
MessageQueue
.
class
))).
thenReturn
(
100L
);
MessageQueue
messageQueue
=
createMessageQueue
();
litePullConsumer
.
assign
(
Collections
.
singletonList
(
messageQueue
));
try
{
litePullConsumer
.
seek
(
messageQueue
,
-
1
);
failBecauseExceptionWasNotThrown
(
MQClientException
.
class
);
}
catch
(
MQClientException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"min offset = 0"
);
}
try
{
litePullConsumer
.
seek
(
messageQueue
,
1000
);
failBecauseExceptionWasNotThrown
(
MQClientException
.
class
);
}
catch
(
MQClientException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"max offset = 100"
);
}
litePullConsumer
.
shutdown
();
}
@Test
public
void
testSeek_MessageQueueNotInAssignList
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createStartLitePullConsumer
();
try
{
litePullConsumer
.
seek
(
createMessageQueue
(),
0
);
failBecauseExceptionWasNotThrown
(
MQClientException
.
class
);
}
catch
(
MQClientException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"The message queue is not in assigned list"
);
}
finally
{
litePullConsumer
.
shutdown
();
}
litePullConsumer
=
createSubscribeLitePullConsumer
();
try
{
litePullConsumer
.
seek
(
createMessageQueue
(),
0
);
failBecauseExceptionWasNotThrown
(
MQClientException
.
class
);
}
catch
(
MQClientException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"The message queue is not in assigned list, may be rebalancing"
);
}
finally
{
litePullConsumer
.
shutdown
();
}
}
@Test
public
void
testOffsetForTimestamp_FailedAndSuccess
()
throws
Exception
{
MessageQueue
messageQueue
=
createMessageQueue
();
DefaultLitePullConsumer
litePullConsumer
=
createNotStartLitePullConsumer
();
try
{
litePullConsumer
.
offsetForTimestamp
(
messageQueue
,
123456L
);
failBecauseExceptionWasNotThrown
(
IllegalStateException
.
class
);
}
catch
(
IllegalStateException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"The consumer not running, please start it first."
);
}
finally
{
litePullConsumer
.
shutdown
();
}
doReturn
(
123L
).
when
(
mQAdminImpl
).
searchOffset
(
any
(
MessageQueue
.
class
),
anyLong
());
litePullConsumer
=
createStartLitePullConsumer
();
long
offset
=
litePullConsumer
.
offsetForTimestamp
(
messageQueue
,
123456L
);
assertThat
(
offset
).
isEqualTo
(
123L
);
}
@Test
public
void
testPauseAndResume_Success
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createNotStartLitePullConsumer
();
try
{
MessageQueue
messageQueue
=
createMessageQueue
();
litePullConsumer
.
assign
(
Collections
.
singletonList
(
messageQueue
));
litePullConsumer
.
pause
(
Collections
.
singletonList
(
messageQueue
));
litePullConsumer
.
start
();
initDefaultLitePullConsumer
(
litePullConsumer
);
List
<
MessageExt
>
result
=
litePullConsumer
.
poll
();
assertThat
(
result
.
isEmpty
()).
isTrue
();
litePullConsumer
.
resume
(
Collections
.
singletonList
(
messageQueue
));
result
=
litePullConsumer
.
poll
();
assertThat
(
result
.
get
(
0
).
getTopic
()).
isEqualTo
(
topic
);
assertThat
(
result
.
get
(
0
).
getBody
()).
isEqualTo
(
new
byte
[]
{
'a'
});
}
finally
{
litePullConsumer
.
shutdown
();
}
}
@Test
public
void
testRegisterTopicMessageQueueChangeListener_Success
()
throws
Exception
{
flag
=
false
;
DefaultLitePullConsumer
litePullConsumer
=
createStartLitePullConsumer
();
doReturn
(
Collections
.
emptySet
()).
when
(
mQAdminImpl
).
fetchSubscribeMessageQueues
(
anyString
());
litePullConsumer
.
setTopicMetadataCheckIntervalMillis
(
10
);
litePullConsumer
.
registerTopicMessageQueueChangeListener
(
topic
,
new
TopicMessageQueueChangeListener
()
{
@Override
public
void
onChanged
(
String
topic
,
Set
<
MessageQueue
>
messageQueues
)
{
flag
=
true
;
}
});
Set
<
MessageQueue
>
set
=
new
HashSet
<
MessageQueue
>();
set
.
add
(
createMessageQueue
());
doReturn
(
set
).
when
(
mQAdminImpl
).
fetchSubscribeMessageQueues
(
anyString
());
Thread
.
sleep
(
11
*
1000
);
assertThat
(
flag
).
isTrue
();
}
@Test
public
void
testFlowControl_Success
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
createStartLitePullConsumer
();
try
{
MessageQueue
messageQueue
=
createMessageQueue
();
litePullConsumer
.
setPullThresholdForAll
(-
1
);
litePullConsumer
.
assign
(
Collections
.
singletonList
(
messageQueue
));
litePullConsumer
.
setPollTimeoutMillis
(
500
);
List
<
MessageExt
>
result
=
litePullConsumer
.
poll
();
assertThat
(
result
).
isEmpty
();
}
finally
{
litePullConsumer
.
shutdown
();
}
litePullConsumer
=
createStartLitePullConsumer
();
try
{
MessageQueue
messageQueue
=
createMessageQueue
();
litePullConsumer
.
setPullThresholdForQueue
(-
1
);
litePullConsumer
.
assign
(
Collections
.
singletonList
(
messageQueue
));
litePullConsumer
.
setPollTimeoutMillis
(
500
);
List
<
MessageExt
>
result
=
litePullConsumer
.
poll
();
assertThat
(
result
).
isEmpty
();
}
finally
{
litePullConsumer
.
shutdown
();
}
litePullConsumer
=
createStartLitePullConsumer
();
try
{
MessageQueue
messageQueue
=
createMessageQueue
();
litePullConsumer
.
setPullThresholdSizeForQueue
(-
1
);
litePullConsumer
.
assign
(
Collections
.
singletonList
(
messageQueue
));
litePullConsumer
.
setPollTimeoutMillis
(
500
);
List
<
MessageExt
>
result
=
litePullConsumer
.
poll
();
assertThat
(
result
).
isEmpty
();
}
finally
{
litePullConsumer
.
shutdown
();
}
litePullConsumer
=
createStartLitePullConsumer
();
try
{
MessageQueue
messageQueue
=
createMessageQueue
();
litePullConsumer
.
setConsumeMaxSpan
(-
1
);
litePullConsumer
.
assign
(
Collections
.
singletonList
(
messageQueue
));
litePullConsumer
.
setPollTimeoutMillis
(
500
);
List
<
MessageExt
>
result
=
litePullConsumer
.
poll
();
assertThat
(
result
).
isEmpty
();
}
finally
{
litePullConsumer
.
shutdown
();
}
}
@Test
public
void
testCheckConfig_Exception
()
{
DefaultLitePullConsumer
litePullConsumer
=
new
DefaultLitePullConsumer
(
MixAll
.
DEFAULT_CONSUMER_GROUP
);
try
{
litePullConsumer
.
start
();
failBecauseExceptionWasNotThrown
(
MQClientException
.
class
);
}
catch
(
MQClientException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"consumerGroup can not equal"
);
}
finally
{
litePullConsumer
.
shutdown
();
}
litePullConsumer
=
new
DefaultLitePullConsumer
(
consumerGroup
+
System
.
currentTimeMillis
());
litePullConsumer
.
setMessageModel
(
null
);
try
{
litePullConsumer
.
start
();
failBecauseExceptionWasNotThrown
(
MQClientException
.
class
);
}
catch
(
MQClientException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"messageModel is null"
);
}
finally
{
litePullConsumer
.
shutdown
();
}
litePullConsumer
=
new
DefaultLitePullConsumer
(
consumerGroup
+
System
.
currentTimeMillis
());
litePullConsumer
.
setAllocateMessageQueueStrategy
(
null
);
try
{
litePullConsumer
.
start
();
failBecauseExceptionWasNotThrown
(
MQClientException
.
class
);
}
catch
(
MQClientException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"allocateMessageQueueStrategy is null"
);
}
finally
{
litePullConsumer
.
shutdown
();
}
litePullConsumer
=
new
DefaultLitePullConsumer
(
consumerGroup
+
System
.
currentTimeMillis
());
litePullConsumer
.
setConsumerTimeoutMillisWhenSuspend
(
1
);
try
{
litePullConsumer
.
start
();
failBecauseExceptionWasNotThrown
(
MQClientException
.
class
);
}
catch
(
MQClientException
e
)
{
assertThat
(
e
).
hasMessageContaining
(
"Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
);
}
finally
{
litePullConsumer
.
shutdown
();
}
}
private
void
initDefaultLitePullConsumer
(
DefaultLitePullConsumer
litePullConsumer
)
throws
Exception
{
Field
field
=
DefaultLitePullConsumer
.
class
.
getDeclaredField
(
"defaultLitePullConsumerImpl"
);
field
.
setAccessible
(
true
);
litePullConsumerImpl
=
(
DefaultLitePullConsumerImpl
)
field
.
get
(
litePullConsumer
);
field
=
DefaultLitePullConsumerImpl
.
class
.
getDeclaredField
(
"mQClientFactory"
);
field
.
setAccessible
(
true
);
field
.
set
(
litePullConsumerImpl
,
mQClientFactory
);
PullAPIWrapper
pullAPIWrapper
=
litePullConsumerImpl
.
getPullAPIWrapper
();
field
=
PullAPIWrapper
.
class
.
getDeclaredField
(
"mQClientFactory"
);
field
.
setAccessible
(
true
);
field
.
set
(
pullAPIWrapper
,
mQClientFactory
);
field
=
MQClientInstance
.
class
.
getDeclaredField
(
"mQClientAPIImpl"
);
field
.
setAccessible
(
true
);
field
.
set
(
mQClientFactory
,
mQClientAPIImpl
);
field
=
MQClientInstance
.
class
.
getDeclaredField
(
"mQAdminImpl"
);
field
.
setAccessible
(
true
);
field
.
set
(
mQClientFactory
,
mQAdminImpl
);
field
=
DefaultLitePullConsumerImpl
.
class
.
getDeclaredField
(
"rebalanceImpl"
);
field
.
setAccessible
(
true
);
rebalanceImpl
=
(
RebalanceImpl
)
field
.
get
(
litePullConsumerImpl
);
field
=
RebalanceImpl
.
class
.
getDeclaredField
(
"mQClientFactory"
);
field
.
setAccessible
(
true
);
field
.
set
(
rebalanceImpl
,
mQClientFactory
);
offsetStore
=
spy
(
litePullConsumerImpl
.
getOffsetStore
());
field
=
DefaultLitePullConsumerImpl
.
class
.
getDeclaredField
(
"offsetStore"
);
field
.
setAccessible
(
true
);
field
.
set
(
litePullConsumerImpl
,
offsetStore
);
when
(
mQClientFactory
.
getMQClientAPIImpl
().
pullMessage
(
anyString
(),
any
(
PullMessageRequestHeader
.
class
),
anyLong
(),
any
(
CommunicationMode
.
class
),
nullable
(
PullCallback
.
class
)))
.
thenAnswer
(
new
Answer
<
Object
>()
{
@Override
public
Object
answer
(
InvocationOnMock
mock
)
throws
Throwable
{
PullMessageRequestHeader
requestHeader
=
mock
.
getArgument
(
1
);
MessageClientExt
messageClientExt
=
new
MessageClientExt
();
messageClientExt
.
setTopic
(
topic
);
messageClientExt
.
setQueueId
(
0
);
messageClientExt
.
setMsgId
(
"123"
);
messageClientExt
.
setBody
(
new
byte
[]
{
'a'
});
messageClientExt
.
setOffsetMsgId
(
"234"
);
messageClientExt
.
setBornHost
(
new
InetSocketAddress
(
8080
));
messageClientExt
.
setStoreHost
(
new
InetSocketAddress
(
8080
));
PullResult
pullResult
=
createPullResult
(
requestHeader
,
PullStatus
.
FOUND
,
Collections
.<
MessageExt
>
singletonList
(
messageClientExt
));
return
pullResult
;
}
});
when
(
mQClientFactory
.
findBrokerAddressInSubscribe
(
anyString
(),
anyLong
(),
anyBoolean
())).
thenReturn
(
new
FindBrokerResult
(
"127.0.0.1:10911"
,
false
));
doReturn
(
Collections
.
singletonList
(
mQClientFactory
.
getClientId
())).
when
(
mQClientFactory
).
findConsumerIdList
(
anyString
(),
anyString
());
doReturn
(
123L
).
when
(
offsetStore
).
readOffset
(
any
(
MessageQueue
.
class
),
any
(
ReadOffsetType
.
class
));
}
private
DefaultLitePullConsumer
createSubscribeLitePullConsumer
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
new
DefaultLitePullConsumer
(
consumerGroup
+
System
.
currentTimeMillis
());
litePullConsumer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
litePullConsumer
.
subscribe
(
topic
,
"*"
);
litePullConsumer
.
start
();
initDefaultLitePullConsumer
(
litePullConsumer
);
return
litePullConsumer
;
}
private
DefaultLitePullConsumer
createStartLitePullConsumer
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
new
DefaultLitePullConsumer
(
consumerGroup
+
System
.
currentTimeMillis
());
litePullConsumer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
litePullConsumer
.
start
();
initDefaultLitePullConsumer
(
litePullConsumer
);
return
litePullConsumer
;
}
private
DefaultLitePullConsumer
createNotStartLitePullConsumer
()
{
DefaultLitePullConsumer
litePullConsumer
=
new
DefaultLitePullConsumer
(
consumerGroup
+
System
.
currentTimeMillis
());
return
litePullConsumer
;
}
private
DefaultLitePullConsumer
createBroadcastLitePullConsumer
()
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
new
DefaultLitePullConsumer
(
consumerGroup
+
System
.
currentTimeMillis
());
litePullConsumer
.
setNamesrvAddr
(
"127.0.0.1:9876"
);
litePullConsumer
.
setMessageModel
(
MessageModel
.
BROADCASTING
);
litePullConsumer
.
subscribe
(
topic
,
"*"
);
litePullConsumer
.
start
();
initDefaultLitePullConsumer
(
litePullConsumer
);
return
litePullConsumer
;
}
private
MessageQueue
createMessageQueue
()
{
MessageQueue
messageQueue
=
new
MessageQueue
();
messageQueue
.
setBrokerName
(
brokerName
);
messageQueue
.
setQueueId
(
0
);
messageQueue
.
setTopic
(
topic
);
return
messageQueue
;
}
private
PullResultExt
createPullResult
(
PullMessageRequestHeader
requestHeader
,
PullStatus
pullStatus
,
List
<
MessageExt
>
messageExtList
)
throws
Exception
{
ByteArrayOutputStream
outputStream
=
new
ByteArrayOutputStream
();
for
(
MessageExt
messageExt
:
messageExtList
)
{
outputStream
.
write
(
MessageDecoder
.
encode
(
messageExt
,
false
));
}
return
new
PullResultExt
(
pullStatus
,
requestHeader
.
getQueueOffset
()
+
messageExtList
.
size
(),
123
,
2048
,
messageExtList
,
0
,
outputStream
.
toByteArray
());
}
}
example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
浏览文件 @
9edeb4ec
...
...
@@ -50,4 +50,4 @@ public class PushConsumer {
consumer
.
start
();
System
.
out
.
printf
(
"Broadcast Consumer Started.%n"
);
}
}
}
\ No newline at end of file
example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.example.simple
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.List
;
import
org.apache.rocketmq.client.consumer.DefaultLitePullConsumer
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
public
class
LitePullConsumerAssign
{
public
static
volatile
boolean
running
=
true
;
public
static
void
main
(
String
[]
args
)
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
new
DefaultLitePullConsumer
(
"please_rename_unique_group_name"
);
litePullConsumer
.
setAutoCommit
(
false
);
litePullConsumer
.
start
();
Collection
<
MessageQueue
>
mqSet
=
litePullConsumer
.
fetchMessageQueues
(
"TopicTest"
);
List
<
MessageQueue
>
list
=
new
ArrayList
<>(
mqSet
);
List
<
MessageQueue
>
assignList
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
list
.
size
()
/
2
;
i
++)
{
assignList
.
add
(
list
.
get
(
i
));
}
litePullConsumer
.
assign
(
assignList
);
litePullConsumer
.
seek
(
assignList
.
get
(
0
),
10
);
try
{
while
(
running
)
{
List
<
MessageExt
>
messageExts
=
litePullConsumer
.
poll
();
System
.
out
.
printf
(
"%s %n"
,
messageExts
);
litePullConsumer
.
commitSync
();
}
}
finally
{
litePullConsumer
.
shutdown
();
}
}
}
example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
0 → 100644
浏览文件 @
9edeb4ec
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.example.simple
;
import
java.util.List
;
import
org.apache.rocketmq.client.consumer.DefaultLitePullConsumer
;
import
org.apache.rocketmq.common.message.MessageExt
;
public
class
LitePullConsumerSubscribe
{
public
static
volatile
boolean
running
=
true
;
public
static
void
main
(
String
[]
args
)
throws
Exception
{
DefaultLitePullConsumer
litePullConsumer
=
new
DefaultLitePullConsumer
(
"please_rename_unique_group_name"
);
litePullConsumer
.
subscribe
(
"TopicTest"
,
"*"
);
litePullConsumer
.
start
();
try
{
while
(
running
)
{
List
<
MessageExt
>
messageExts
=
litePullConsumer
.
poll
();
System
.
out
.
printf
(
"%s%n"
,
messageExts
);
}
}
finally
{
litePullConsumer
.
shutdown
();
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录