From 713bb9883b687cc18de388d92fe0c9cfd830cc04 Mon Sep 17 00:00:00 2001 From: vongosling Date: Wed, 31 Jul 2019 11:41:40 +0800 Subject: [PATCH] Refactor the protection logic when pulling --- .../consumer/DefaultMQPullConsumerImpl.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 8aff14b7..d4842980 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -94,13 +94,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); } - private void makeSureStateOK() throws MQClientException { + private void isRunning() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The consumer service state not OK, " + throw new MQClientException("The consumer is not in running status, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); @@ -108,12 +108,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE); } public Set fetchMessageQueuesInBalance(String topic) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); if (null == topic) { throw new IllegalArgumentException("topic is null"); } @@ -130,12 +130,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public List fetchPublishMessageQueues(String topic) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); } public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); // check if has info in memory, otherwise invoke api. Set result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); if (null == result) { @@ -156,17 +156,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); } public long maxOffset(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } public long minOffset(MessageQueue mq) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); return this.mQClientFactory.getMQAdminImpl().minOffset(mq); } @@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - this.makeSureStateOK(); + this.isRunning(); if (null == mq) { throw new MQClientException("mq is null", null); @@ -383,7 +383,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { @Override public void persistConsumerOffset() { try { - this.makeSureStateOK(); + this.isRunning(); Set mqs = new HashSet(); Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); mqs.addAll(allocateMq); @@ -466,7 +466,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { final PullCallback pullCallback, final boolean block, final long timeout) throws MQClientException, RemotingException, InterruptedException { - this.makeSureStateOK(); + this.isRunning(); if (null == mq) { throw new MQClientException("mq is null", null); @@ -543,18 +543,18 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { - this.makeSureStateOK(); + this.isRunning(); return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); } public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException, InterruptedException { - this.makeSureStateOK(); + this.isRunning(); return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } @@ -748,13 +748,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { - this.makeSureStateOK(); + this.isRunning(); this.offsetStore.updateOffset(mq, offset, false); } public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.makeSureStateOK(); + this.isRunning(); return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); } -- GitLab