diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 4e139c44ce0c25d0eee8b01bc2f87804a036c8b9..e835be1cd5202ce42eb970e42468e1a2524baac1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first."; - private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive."; + private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive."; /** * the type of subscription */ @@ -195,8 +195,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } private void checkServiceState() { - if (this.serviceState != ServiceState.RUNNING) + if (this.serviceState != ServiceState.RUNNING) { throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); + } } public void updateNameServerAddr(String newAddresses) { @@ -204,10 +205,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } private synchronized void setSubscriptionType(SubscriptionType type) { - if (this.subscriptionType == SubscriptionType.NONE) + if (this.subscriptionType == SubscriptionType.NONE) { this.subscriptionType = type; - else if (this.subscriptionType != type) - throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE); + } else if (this.subscriptionType != type) { + throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE); + } } private void updateAssignedMessageQueue(String topic, Set assignedMessageQueue) { @@ -464,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized void subscribe(String topic, String subExpression) throws MQClientException { try { - if (topic == null || topic.equals("")) { + if (topic == null || "".equals(topic)) { throw new IllegalArgumentException("Topic can not be null or empty."); } setSubscriptionType(SubscriptionType.SUBSCRIBE); @@ -483,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { try { - if (topic == null || topic.equals("")) { + if (topic == null || "".equals(topic)) { throw new IllegalArgumentException("Topic can not be null or empty."); } setSubscriptionType(SubscriptionType.SUBSCRIBE); @@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized List poll(long timeout) { try { checkServiceState(); - if (timeout < 0) + if (timeout < 0) { throw new IllegalArgumentException("Timeout must not be negative"); + } if (defaultLitePullConsumer.isAutoCommit()) { maybeAutoCommit(); @@ -546,8 +549,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { if (endTime - System.currentTimeMillis() > 0) { while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (endTime - System.currentTimeMillis() <= 0) + if (endTime - System.currentTimeMillis() <= 0) { break; + } } } @@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public long committed(MessageQueue messageQueue) throws MQClientException { checkServiceState(); long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE); - if (offset == -2) + if (offset == -2) { throw new MQClientException("Fetch consume offset from broker exception", null); + } return offset; } @@ -683,8 +688,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } Iterator iter = consumeRequestCache.iterator(); while (iter.hasNext()) { - if (iter.next().getMessageQueue().equals(messageQueue)) + if (iter.next().getMessageQueue().equals(messageQueue)) { iter.remove(); + } } } @@ -735,10 +741,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return; } - if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { + if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); - if ((consumeRequestFlowControlTimes++ % 1000) == 0) + if ((consumeRequestFlowControlTimes++ % 1000) == 0) { log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); + } return; } @@ -790,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { long pullDelayTimeMills = 0; try { SubscriptionData subscriptionData; + String topic = this.messageQueue.getTopic(); if (subscriptionType == SubscriptionType.SUBSCRIBE) { - String topic = this.messageQueue.getTopic(); subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); } else { - String topic = this.messageQueue.getTopic(); subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); }