提交 61cd395d 编写于 作者: qingjiyuji's avatar qingjiyuji

code style(client):polish lite pull consumer code style

上级 160c5772
...@@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first."; private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive."; private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
/** /**
* the type of subscription * the type of subscription
*/ */
...@@ -195,8 +195,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -195,8 +195,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
private void checkServiceState() { private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING) if (this.serviceState != ServiceState.RUNNING) {
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
}
} }
public void updateNameServerAddr(String newAddresses) { public void updateNameServerAddr(String newAddresses) {
...@@ -204,10 +205,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -204,10 +205,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
private synchronized void setSubscriptionType(SubscriptionType type) { private synchronized void setSubscriptionType(SubscriptionType type) {
if (this.subscriptionType == SubscriptionType.NONE) if (this.subscriptionType == SubscriptionType.NONE) {
this.subscriptionType = type; this.subscriptionType = type;
else if (this.subscriptionType != type) } else if (this.subscriptionType != type) {
throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE); throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE);
}
} }
private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) { private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
...@@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized List<MessageExt> poll(long timeout) { public synchronized List<MessageExt> poll(long timeout) {
try { try {
checkServiceState(); checkServiceState();
if (timeout < 0) if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative"); throw new IllegalArgumentException("Timeout must not be negative");
}
if (defaultLitePullConsumer.isAutoCommit()) { if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit(); maybeAutoCommit();
...@@ -546,8 +549,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -546,8 +549,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (endTime - System.currentTimeMillis() > 0) { if (endTime - System.currentTimeMillis() > 0) {
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0) if (endTime - System.currentTimeMillis() <= 0) {
break; break;
}
} }
} }
...@@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public long committed(MessageQueue messageQueue) throws MQClientException { public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState(); checkServiceState();
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE); long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (offset == -2) if (offset == -2) {
throw new MQClientException("Fetch consume offset from broker exception", null); throw new MQClientException("Fetch consume offset from broker exception", null);
}
return offset; return offset;
} }
...@@ -683,8 +688,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -683,8 +688,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
Iterator<ConsumeRequest> iter = consumeRequestCache.iterator(); Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
if (iter.next().getMessageQueue().equals(messageQueue)) if (iter.next().getMessageQueue().equals(messageQueue)) {
iter.remove(); iter.remove();
}
} }
} }
...@@ -737,8 +743,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -737,8 +743,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册