提交 893e8fd8 编写于 作者: qingjiyuji's avatar qingjiyuji

code style(client):polish lite pull consumer code style

上级 61cd395d
...@@ -466,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -466,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, String subExpression) throws MQClientException { public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try { try {
if (topic == null || topic.equals("")) { if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty."); throw new IllegalArgumentException("Topic can not be null or empty.");
} }
setSubscriptionType(SubscriptionType.SUBSCRIBE); setSubscriptionType(SubscriptionType.SUBSCRIBE);
...@@ -485,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -485,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
try { try {
if (topic == null || topic.equals("")) { if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty."); throw new IllegalArgumentException("Topic can not be null or empty.");
} }
setSubscriptionType(SubscriptionType.SUBSCRIBE); setSubscriptionType(SubscriptionType.SUBSCRIBE);
...@@ -741,7 +741,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -741,7 +741,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return; return;
} }
if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) { if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
...@@ -797,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -797,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long pullDelayTimeMills = 0; long pullDelayTimeMills = 0;
try { try {
SubscriptionData subscriptionData; SubscriptionData subscriptionData;
String topic = this.messageQueue.getTopic();
if (subscriptionType == SubscriptionType.SUBSCRIBE) { if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic();
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else { } else {
String topic = this.messageQueue.getTopic();
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册