提交 713bb988 编写于 作者: V vongosling

Refactor the protection logic when pulling

上级 b39031a7
...@@ -94,13 +94,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -94,13 +94,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
} }
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
} }
private void makeSureStateOK() throws MQClientException { private void isRunning() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) { if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, " throw new MQClientException("The consumer is not in running status, "
+ this.serviceState + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null); null);
...@@ -108,12 +108,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -108,12 +108,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
} }
public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException { public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE); return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
} }
public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException { public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
if (null == topic) { if (null == topic) {
throw new IllegalArgumentException("topic is null"); throw new IllegalArgumentException("topic is null");
} }
...@@ -130,12 +130,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -130,12 +130,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
} }
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
} }
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
// check if has info in memory, otherwise invoke api. // check if has info in memory, otherwise invoke api.
Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
if (null == result) { if (null == result) {
...@@ -156,17 +156,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -156,17 +156,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
} }
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
} }
public long maxOffset(MessageQueue mq) throws MQClientException { public long maxOffset(MessageQueue mq) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} }
public long minOffset(MessageQueue mq) throws MQClientException { public long minOffset(MessageQueue mq) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
return this.mQClientFactory.getMQAdminImpl().minOffset(mq); return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
} }
...@@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
long timeout) long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK(); this.isRunning();
if (null == mq) { if (null == mq) {
throw new MQClientException("mq is null", null); throw new MQClientException("mq is null", null);
...@@ -383,7 +383,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -383,7 +383,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
@Override @Override
public void persistConsumerOffset() { public void persistConsumerOffset() {
try { try {
this.makeSureStateOK(); this.isRunning();
Set<MessageQueue> mqs = new HashSet<MessageQueue>(); Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq); mqs.addAll(allocateMq);
...@@ -466,7 +466,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -466,7 +466,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
final PullCallback pullCallback, final PullCallback pullCallback,
final boolean block, final boolean block,
final long timeout) throws MQClientException, RemotingException, InterruptedException { final long timeout) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK(); this.isRunning();
if (null == mq) { if (null == mq) {
throw new MQClientException("mq is null", null); throw new MQClientException("mq is null", null);
...@@ -543,18 +543,18 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -543,18 +543,18 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException { throws MQClientException, InterruptedException {
this.makeSureStateOK(); this.isRunning();
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
} }
public MessageExt queryMessageByUniqKey(String topic, String uniqKey) public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
throws MQClientException, InterruptedException { throws MQClientException, InterruptedException {
this.makeSureStateOK(); this.isRunning();
return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
} }
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} }
...@@ -748,13 +748,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -748,13 +748,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
} }
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
this.makeSureStateOK(); this.isRunning();
this.offsetStore.updateOffset(mq, offset, false); this.offsetStore.updateOffset(mq, offset, false);
} }
public MessageExt viewMessage(String msgId) public MessageExt viewMessage(String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.makeSureStateOK(); this.isRunning();
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册