Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
f32e0b9d
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
f32e0b9d
编写于
1月 13, 2017
作者:
Z
Zhanhui Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add javadoc to DefaultMQPushConsumer
上级
de628a44
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
178 addition
and
9 deletion
+178
-9
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
...pache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+178
-9
未找到文件。
client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
浏览文件 @
f32e0b9d
...
@@ -40,51 +40,116 @@ import org.apache.rocketmq.remoting.RPCHook;
...
@@ -40,51 +40,116 @@ import org.apache.rocketmq.remoting.RPCHook;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
/**
/**
* Wrapped push consumer.in fact,it works as remarkable as the pull consumer
* In most scenarios, this is the mostly recommended class to consume messages.
* </p>
*
* Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on
* arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.
* </p>
*
* See quickstart/Consumer in the example module for a typical usage.
* </p>
*
* <p>
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* </p>
*/
*/
public
class
DefaultMQPushConsumer
extends
ClientConfig
implements
MQPushConsumer
{
public
class
DefaultMQPushConsumer
extends
ClientConfig
implements
MQPushConsumer
{
/**
* Internal implementation. Most of the functions herein are delegated to it.
*/
protected
final
transient
DefaultMQPushConsumerImpl
defaultMQPushConsumerImpl
;
protected
final
transient
DefaultMQPushConsumerImpl
defaultMQPushConsumerImpl
;
/**
/**
* Do the same thing for the same Group, the application must be set,and
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* guarantee Globally unique
* load balance. It's required and needs to be globally unique.
* </p>
*
* See <a href="http://rocketmq.incubator.apache.org/docs/core-concept/">here</a> for further discussion.
*/
*/
private
String
consumerGroup
;
private
String
consumerGroup
;
/**
/**
* Consumption pattern,default is clustering
* Message model defines the way how messages are delivered to each consumer clients.
* </p>
*
* RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
* the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
* balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
* separately.
* </p>
*
* This field defaults to clustering.
*/
*/
private
MessageModel
messageModel
=
MessageModel
.
CLUSTERING
;
private
MessageModel
messageModel
=
MessageModel
.
CLUSTERING
;
/**
/**
* Consumption offset
* Consuming point on consumer booting.
* </p>
*
* There are three consuming points:
* <ul>
* <li>
* <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
* If it were a newly booting up consumer client, according aging of the consumer group, there are two
* cases:
* <ol>
* <li>
* if the consumer group is created so recently that the earliest message being subscribed has yet
* expired, which means the consumer group represents a lately launched business, consuming will
* start from the very beginning;
* </li>
* <li>
* if the earliest message being subscribed has expired, consuming will start from the latest
* messages, meaning messages born prior to the booting timestamp would be ignored.
* </li>
* </ol>
* </li>
* <li>
* <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
* </li>
* <li>
* <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
* messages born prior to {@link #consumeTimestamp} will be ignored
* </li>
* </ul>
*/
*/
private
ConsumeFromWhere
consumeFromWhere
=
ConsumeFromWhere
.
CONSUME_FROM_LAST_OFFSET
;
private
ConsumeFromWhere
consumeFromWhere
=
ConsumeFromWhere
.
CONSUME_FROM_LAST_OFFSET
;
/**
/**
* Backtracking consumption time with second precision.
t
ime format is
* Backtracking consumption time with second precision.
T
ime format is
* 20131223171201<br>
* 20131223171201<br>
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
* Default backtracking consumption time Half an hour ago
* Default backtracking consumption time Half an hour ago
.
*/
*/
private
String
consumeTimestamp
=
UtilAll
.
timeMillisToHumanString3
(
System
.
currentTimeMillis
()
-
(
1000
*
60
*
30
));
private
String
consumeTimestamp
=
UtilAll
.
timeMillisToHumanString3
(
System
.
currentTimeMillis
()
-
(
1000
*
60
*
30
));
/**
/**
* Queue allocation algorithm
* Queue allocation algorithm
specifying how message queues are allocated to each consumer clients.
*/
*/
private
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
;
private
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
;
/**
/**
* Subscription relationship
* Subscription relationship
*/
*/
private
Map
<
String
/* topic */
,
String
/* sub expression */
>
subscription
=
new
HashMap
<
String
,
String
>();
private
Map
<
String
/* topic */
,
String
/* sub expression */
>
subscription
=
new
HashMap
<>();
/**
/**
* Message listener
* Message listener
*/
*/
private
MessageListener
messageListener
;
private
MessageListener
messageListener
;
/**
/**
* Offset Storage
* Offset Storage
*/
*/
private
OffsetStore
offsetStore
;
private
OffsetStore
offsetStore
;
/**
/**
* Minimum consumer thread number
* Minimum consumer thread number
*/
*/
private
int
consumeThreadMin
=
20
;
private
int
consumeThreadMin
=
20
;
/**
/**
* Max consumer thread number
* Max consumer thread number
*/
*/
...
@@ -99,18 +164,22 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
...
@@ -99,18 +164,22 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Concurrently max span offset.it has no effect on sequential consumption
* Concurrently max span offset.it has no effect on sequential consumption
*/
*/
private
int
consumeConcurrentlyMaxSpan
=
2000
;
private
int
consumeConcurrentlyMaxSpan
=
2000
;
/**
/**
* Flow control threshold
* Flow control threshold
*/
*/
private
int
pullThresholdForQueue
=
1000
;
private
int
pullThresholdForQueue
=
1000
;
/**
/**
* Message pull Interval
* Message pull Interval
*/
*/
private
long
pullInterval
=
0
;
private
long
pullInterval
=
0
;
/**
/**
* Batch consumption size
* Batch consumption size
*/
*/
private
int
consumeMessageBatchMaxSize
=
1
;
private
int
consumeMessageBatchMaxSize
=
1
;
/**
/**
* Batch pull size
* Batch pull size
*/
*/
...
@@ -126,24 +195,56 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
...
@@ -126,24 +195,56 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
*/
private
boolean
unitMode
=
false
;
private
boolean
unitMode
=
false
;
/**
* Max re-consume times. -1 means 16 times.
* </p>
*
* If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
* queue waiting.
*/
private
int
maxReconsumeTimes
=
-
1
;
private
int
maxReconsumeTimes
=
-
1
;
/**
* Suspending pulling time for cases requiring slow pulling like flow-control scenario.
*/
private
long
suspendCurrentQueueTimeMillis
=
1000
;
private
long
suspendCurrentQueueTimeMillis
=
1000
;
/**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
private
long
consumeTimeout
=
15
;
private
long
consumeTimeout
=
15
;
/**
* Default constructor.
*/
public
DefaultMQPushConsumer
()
{
public
DefaultMQPushConsumer
()
{
this
(
MixAll
.
DEFAULT_CONSUMER_GROUP
,
null
,
new
AllocateMessageQueueAveragely
());
this
(
MixAll
.
DEFAULT_CONSUMER_GROUP
,
null
,
new
AllocateMessageQueueAveragely
());
}
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
*/
public
DefaultMQPushConsumer
(
final
String
consumerGroup
,
RPCHook
rpcHook
,
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
)
{
public
DefaultMQPushConsumer
(
final
String
consumerGroup
,
RPCHook
rpcHook
,
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
)
{
this
.
consumerGroup
=
consumerGroup
;
this
.
consumerGroup
=
consumerGroup
;
this
.
allocateMessageQueueStrategy
=
allocateMessageQueueStrategy
;
this
.
allocateMessageQueueStrategy
=
allocateMessageQueueStrategy
;
defaultMQPushConsumerImpl
=
new
DefaultMQPushConsumerImpl
(
this
,
rpcHook
);
defaultMQPushConsumerImpl
=
new
DefaultMQPushConsumerImpl
(
this
,
rpcHook
);
}
}
/**
* Constructor specifying RPC hook.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public
DefaultMQPushConsumer
(
RPCHook
rpcHook
)
{
public
DefaultMQPushConsumer
(
RPCHook
rpcHook
)
{
this
(
MixAll
.
DEFAULT_CONSUMER_GROUP
,
rpcHook
,
new
AllocateMessageQueueAveragely
());
this
(
MixAll
.
DEFAULT_CONSUMER_GROUP
,
rpcHook
,
new
AllocateMessageQueueAveragely
());
}
}
/**
* Constructor specifying consumer group.
* @param consumerGroup Consumer group.
*/
public
DefaultMQPushConsumer
(
final
String
consumerGroup
)
{
public
DefaultMQPushConsumer
(
final
String
consumerGroup
)
{
this
(
consumerGroup
,
null
,
new
AllocateMessageQueueAveragely
());
this
(
consumerGroup
,
null
,
new
AllocateMessageQueueAveragely
());
}
}
...
@@ -308,12 +409,33 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
...
@@ -308,12 +409,33 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this
.
subscription
=
subscription
;
this
.
subscription
=
subscription
;
}
}
/**
* Send message back to broker which will be re-delivered in future.
* @param msg Message to send back.
* @param delayLevel delay level.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
*/
@Override
@Override
public
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
)
public
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
this
.
defaultMQPushConsumerImpl
.
sendMessageBack
(
msg
,
delayLevel
,
null
);
this
.
defaultMQPushConsumerImpl
.
sendMessageBack
(
msg
,
delayLevel
,
null
);
}
}
/**
* Send message back to the broker whose name is <code>brokerName</code> and the message will be re-delivered in
* future.
*
* @param msg Message to send back.
* @param delayLevel delay level.
* @param brokerName broker name.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
*/
@Override
@Override
public
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
,
String
brokerName
)
public
void
sendMessageBack
(
MessageExt
msg
,
int
delayLevel
,
String
brokerName
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
...
@@ -325,11 +447,18 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
...
@@ -325,11 +447,18 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
return
this
.
defaultMQPushConsumerImpl
.
fetchSubscribeMessageQueues
(
topic
);
return
this
.
defaultMQPushConsumerImpl
.
fetchSubscribeMessageQueues
(
topic
);
}
}
/**
* This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
* @throws MQClientException if there is any client error.
*/
@Override
@Override
public
void
start
()
throws
MQClientException
{
public
void
start
()
throws
MQClientException
{
this
.
defaultMQPushConsumerImpl
.
start
();
this
.
defaultMQPushConsumerImpl
.
start
();
}
}
/**
* Shut down this client and releasing underlying resources.
*/
@Override
@Override
public
void
shutdown
()
{
public
void
shutdown
()
{
this
.
defaultMQPushConsumerImpl
.
shutdown
();
this
.
defaultMQPushConsumerImpl
.
shutdown
();
...
@@ -342,43 +471,83 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
...
@@ -342,43 +471,83 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this
.
defaultMQPushConsumerImpl
.
registerMessageListener
(
messageListener
);
this
.
defaultMQPushConsumerImpl
.
registerMessageListener
(
messageListener
);
}
}
/**
* Register a callback to execute on message arrival for concurrent consuming.
*
* @param messageListener message handling callback.
*/
@Override
@Override
public
void
registerMessageListener
(
MessageListenerConcurrently
messageListener
)
{
public
void
registerMessageListener
(
MessageListenerConcurrently
messageListener
)
{
this
.
messageListener
=
messageListener
;
this
.
messageListener
=
messageListener
;
this
.
defaultMQPushConsumerImpl
.
registerMessageListener
(
messageListener
);
this
.
defaultMQPushConsumerImpl
.
registerMessageListener
(
messageListener
);
}
}
/**
* Register a callback to execute on message arrival for orderly consuming.
*
* @param messageListener message handling callback.
*/
@Override
@Override
public
void
registerMessageListener
(
MessageListenerOrderly
messageListener
)
{
public
void
registerMessageListener
(
MessageListenerOrderly
messageListener
)
{
this
.
messageListener
=
messageListener
;
this
.
messageListener
=
messageListener
;
this
.
defaultMQPushConsumerImpl
.
registerMessageListener
(
messageListener
);
this
.
defaultMQPushConsumerImpl
.
registerMessageListener
(
messageListener
);
}
}
/**
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
* if null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
@Override
public
void
subscribe
(
String
topic
,
String
subExpression
)
throws
MQClientException
{
public
void
subscribe
(
String
topic
,
String
subExpression
)
throws
MQClientException
{
this
.
defaultMQPushConsumerImpl
.
subscribe
(
topic
,
subExpression
);
this
.
defaultMQPushConsumerImpl
.
subscribe
(
topic
,
subExpression
);
}
}
/**
* Subscribe a topic to consuming subscription.
* @param topic topic to consume.
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
* @throws MQClientException
*/
@Override
@Override
public
void
subscribe
(
String
topic
,
String
fullClassName
,
String
filterClassSource
)
throws
MQClientException
{
public
void
subscribe
(
String
topic
,
String
fullClassName
,
String
filterClassSource
)
throws
MQClientException
{
this
.
defaultMQPushConsumerImpl
.
subscribe
(
topic
,
fullClassName
,
filterClassSource
);
this
.
defaultMQPushConsumerImpl
.
subscribe
(
topic
,
fullClassName
,
filterClassSource
);
}
}
/**
* Un-subscribe the specified topic from subscription.
* @param topic message topic
*/
@Override
@Override
public
void
unsubscribe
(
String
topic
)
{
public
void
unsubscribe
(
String
topic
)
{
this
.
defaultMQPushConsumerImpl
.
unsubscribe
(
topic
);
this
.
defaultMQPushConsumerImpl
.
unsubscribe
(
topic
);
}
}
/**
* Update the message consuming thread core pool size.
*
* @param corePoolSize new core pool size.
*/
@Override
@Override
public
void
updateCorePoolSize
(
int
corePoolSize
)
{
public
void
updateCorePoolSize
(
int
corePoolSize
)
{
this
.
defaultMQPushConsumerImpl
.
updateCorePoolSize
(
corePoolSize
);
this
.
defaultMQPushConsumerImpl
.
updateCorePoolSize
(
corePoolSize
);
}
}
/**
* Suspend pulling new messages.
*/
@Override
@Override
public
void
suspend
()
{
public
void
suspend
()
{
this
.
defaultMQPushConsumerImpl
.
suspend
();
this
.
defaultMQPushConsumerImpl
.
suspend
();
}
}
/**
* Resume pulling.
*/
@Override
@Override
public
void
resume
()
{
public
void
resume
()
{
this
.
defaultMQPushConsumerImpl
.
resume
();
this
.
defaultMQPushConsumerImpl
.
resume
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录