diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index b1caeedee846f3c1d0f0a21f0d33034e5b87d176..e835be1cd5202ce42eb970e42468e1a2524baac1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -466,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized void subscribe(String topic, String subExpression) throws MQClientException { try { - if (topic == null || topic.equals("")) { + if (topic == null || "".equals(topic)) { throw new IllegalArgumentException("Topic can not be null or empty."); } setSubscriptionType(SubscriptionType.SUBSCRIBE); @@ -485,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { try { - if (topic == null || topic.equals("")) { + if (topic == null || "".equals(topic)) { throw new IllegalArgumentException("Topic can not be null or empty."); } setSubscriptionType(SubscriptionType.SUBSCRIBE); @@ -741,7 +741,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return; } - if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { + if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((consumeRequestFlowControlTimes++ % 1000) == 0) { log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); @@ -797,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { long pullDelayTimeMills = 0; try { SubscriptionData subscriptionData; + String topic = this.messageQueue.getTopic(); if (subscriptionType == SubscriptionType.SUBSCRIBE) { - String topic = this.messageQueue.getTopic(); subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); } else { - String topic = this.messageQueue.getTopic(); subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); }