From 61cd395d68db3cd5682060a75fe1ab6e56c0d82d Mon Sep 17 00:00:00 2001 From: qingjiyuji <1114366223@qq.com> Date: Wed, 23 Jun 2021 20:22:03 +0800 Subject: [PATCH] code style(client):polish lite pull consumer code style --- .../consumer/DefaultLitePullConsumerImpl.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 4e139c44..b1caeede 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first."; - private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive."; + private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive."; /** * the type of subscription */ @@ -195,8 +195,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } private void checkServiceState() { - if (this.serviceState != ServiceState.RUNNING) + if (this.serviceState != ServiceState.RUNNING) { throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); + } } public void updateNameServerAddr(String newAddresses) { @@ -204,10 +205,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } private synchronized void setSubscriptionType(SubscriptionType type) { - if (this.subscriptionType == SubscriptionType.NONE) + if (this.subscriptionType == SubscriptionType.NONE) { this.subscriptionType = type; - else if (this.subscriptionType != type) - throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE); + } else if (this.subscriptionType != type) { + throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE); + } } private void updateAssignedMessageQueue(String topic, Set assignedMessageQueue) { @@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized List poll(long timeout) { try { checkServiceState(); - if (timeout < 0) + if (timeout < 0) { throw new IllegalArgumentException("Timeout must not be negative"); + } if (defaultLitePullConsumer.isAutoCommit()) { maybeAutoCommit(); @@ -546,8 +549,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { if (endTime - System.currentTimeMillis() > 0) { while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (endTime - System.currentTimeMillis() <= 0) + if (endTime - System.currentTimeMillis() <= 0) { break; + } } } @@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public long committed(MessageQueue messageQueue) throws MQClientException { checkServiceState(); long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE); - if (offset == -2) + if (offset == -2) { throw new MQClientException("Fetch consume offset from broker exception", null); + } return offset; } @@ -683,8 +688,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } Iterator iter = consumeRequestCache.iterator(); while (iter.hasNext()) { - if (iter.next().getMessageQueue().equals(messageQueue)) + if (iter.next().getMessageQueue().equals(messageQueue)) { iter.remove(); + } } } @@ -737,8 +743,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); - if ((consumeRequestFlowControlTimes++ % 1000) == 0) + if ((consumeRequestFlowControlTimes++ % 1000) == 0) { log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); + } return; } -- GitLab