未验证 提交 6aabf776 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #3047 from qingjiyuji/develop

[ISSUE #3046]:Polish lite pull consumer code style
...@@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first."; private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive."; private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
/** /**
* the type of subscription * the type of subscription
*/ */
...@@ -195,19 +195,21 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -195,19 +195,21 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
private void checkServiceState() { private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING) if (this.serviceState != ServiceState.RUNNING) {
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
} }
}
public void updateNameServerAddr(String newAddresses) { public void updateNameServerAddr(String newAddresses) {
this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses); this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses);
} }
private synchronized void setSubscriptionType(SubscriptionType type) { private synchronized void setSubscriptionType(SubscriptionType type) {
if (this.subscriptionType == SubscriptionType.NONE) if (this.subscriptionType == SubscriptionType.NONE) {
this.subscriptionType = type; this.subscriptionType = type;
else if (this.subscriptionType != type) } else if (this.subscriptionType != type) {
throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE); throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE);
}
} }
private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) { private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
...@@ -464,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -464,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, String subExpression) throws MQClientException { public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try { try {
if (topic == null || topic.equals("")) { if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty."); throw new IllegalArgumentException("Topic can not be null or empty.");
} }
setSubscriptionType(SubscriptionType.SUBSCRIBE); setSubscriptionType(SubscriptionType.SUBSCRIBE);
...@@ -483,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -483,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
try { try {
if (topic == null || topic.equals("")) { if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty."); throw new IllegalArgumentException("Topic can not be null or empty.");
} }
setSubscriptionType(SubscriptionType.SUBSCRIBE); setSubscriptionType(SubscriptionType.SUBSCRIBE);
...@@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized List<MessageExt> poll(long timeout) { public synchronized List<MessageExt> poll(long timeout) {
try { try {
checkServiceState(); checkServiceState();
if (timeout < 0) if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative"); throw new IllegalArgumentException("Timeout must not be negative");
}
if (defaultLitePullConsumer.isAutoCommit()) { if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit(); maybeAutoCommit();
...@@ -546,10 +549,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -546,10 +549,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (endTime - System.currentTimeMillis() > 0) { if (endTime - System.currentTimeMillis() > 0) {
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0) if (endTime - System.currentTimeMillis() <= 0) {
break; break;
} }
} }
}
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
List<MessageExt> messages = consumeRequest.getMessageExts(); List<MessageExt> messages = consumeRequest.getMessageExts();
...@@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public long committed(MessageQueue messageQueue) throws MQClientException { public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState(); checkServiceState();
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE); long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (offset == -2) if (offset == -2) {
throw new MQClientException("Fetch consume offset from broker exception", null); throw new MQClientException("Fetch consume offset from broker exception", null);
}
return offset; return offset;
} }
...@@ -683,10 +688,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -683,10 +688,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
Iterator<ConsumeRequest> iter = consumeRequestCache.iterator(); Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
if (iter.next().getMessageQueue().equals(messageQueue)) if (iter.next().getMessageQueue().equals(messageQueue)) {
iter.remove(); iter.remove();
} }
} }
}
private long nextPullOffset(MessageQueue messageQueue) throws MQClientException { private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
long offset = -1; long offset = -1;
...@@ -735,10 +741,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -735,10 +741,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return; return;
} }
if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
return; return;
} }
...@@ -790,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -790,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long pullDelayTimeMills = 0; long pullDelayTimeMills = 0;
try { try {
SubscriptionData subscriptionData; SubscriptionData subscriptionData;
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic(); String topic = this.messageQueue.getTopic();
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else { } else {
String topic = this.messageQueue.getTopic();
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册