From 3530b76e7bd1a493ac302948378b7d080430e2ec Mon Sep 17 00:00:00 2001 From: "maowei.ymw" Date: Fri, 2 Nov 2018 17:03:04 +0800 Subject: [PATCH] MQPullConsumer support MessageSelector --- .../consumer/DefaultMQPullConsumer.java | 26 +++++ .../client/consumer/MQPullConsumer.java | 47 +++++++++ .../consumer/DefaultMQPullConsumerImpl.java | 95 ++++++++++++++----- .../client/impl/consumer/PullAPIWrapper.java | 29 +----- .../client/impl/factory/MQClientInstance.java | 13 +++ 5 files changed, 157 insertions(+), 53 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index cd706703..6befbf3b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -257,6 +257,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout); } + @Override + public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums); + } + + @Override + public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout); + } + @Override public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { @@ -270,6 +282,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout); } + @Override + public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback); + } + + @Override + public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout); + } + @Override public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index 33002c98..28b807c2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -66,6 +66,39 @@ public interface MQPullConsumer extends MQConsumer { final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + /** + * Pulling the messages, not blocking + *

+ * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92} + *

+ * + * @param mq from which message queue + * @param selector message selector({@link MessageSelector}), can be null. + * @param offset from where to pull + * @param maxNums max pulling numbers + * @return The resulting {@code PullRequest} + */ + PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset, + final int maxNums) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + /** + * Pulling the messages in the specified timeout + *

+ * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92} + *

+ * + * @param mq from which message queue + * @param selector message selector({@link MessageSelector}), can be null. + * @param offset from where to pull + * @param maxNums max pulling numbers + * @param timeout Pulling the messages in the specified timeout + * @return The resulting {@code PullRequest} + */ + PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset, + final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + /** * Pulling the messages in a async. way */ @@ -80,6 +113,20 @@ public interface MQPullConsumer extends MQConsumer { final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, InterruptedException; + /** + * Pulling the messages in a async. way. Support message selection + */ + void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums, + final PullCallback pullCallback) throws MQClientException, RemotingException, + InterruptedException; + + /** + * Pulling the messages in a async. way. Support message selection + */ + void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums, + final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, + InterruptedException; + /** * Pulling the messages,if no message arrival,blocking some time * diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 420d89b2..1804df20 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -158,17 +159,58 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout); + SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout); } - private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, + public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); + } + + public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout); + } + + private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression) + throws MQClientException { + + if (null == mq) { + throw new MQClientException("mq is null", null); + } + + try { + return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), + mq.getTopic(), subExpression); + } catch (Exception e) { + throw new MQClientException("parse subscription error", e); + } + } + + private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector) + throws MQClientException { + + if (null == mq) { + throw new MQClientException("mq is null", null); + } + + try { + return FilterAPI.build(mq.getTopic(), + messageSelector.getExpression(), messageSelector.getExpressionType()); + } catch (Exception e) { + throw new MQClientException("parse subscription error", e); + } + } + + private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); if (null == mq) { throw new MQClientException("mq is null", null); - } if (offset < 0) { @@ -183,20 +225,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); - SubscriptionData subscriptionData; - try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), - mq.getTopic(), subExpression); - } catch (Exception e) { - throw new MQClientException("parse subscription error", e); - } - long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), - 0L, + subscriptionData.getExpressionType(), + subscriptionData.getSubVersion(), offset, maxNums, sysFlag, @@ -369,12 +404,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { - this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); + SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); + this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout); + } + + public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); + } + + public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback, + long timeout) + throws MQClientException, RemotingException, InterruptedException { + SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); + this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout); } private void pullAsyncImpl( final MessageQueue mq, - final String subExpression, + final SubscriptionData subscriptionData, final long offset, final int maxNums, final PullCallback pullCallback, @@ -403,20 +453,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { try { int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); - final SubscriptionData subscriptionData; - try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), - mq.getTopic(), subExpression); - } catch (Exception e) { - throw new MQClientException("parse subscription error", e); - } - long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), - 0L, + subscriptionData.getExpressionType(), + subscriptionData.getSubVersion(), offset, maxNums, sysFlag, @@ -444,7 +487,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } public DefaultMQPullConsumer getDefaultMQPullConsumer() { @@ -454,7 +498,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { - this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true, + SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); + this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index b650e35e..ac44df43 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -163,6 +163,7 @@ public class PullAPIWrapper { this.recalculatePullFromWhichNode(mq), false); } + if (findBrokerResult != null) { { // check version @@ -209,34 +210,6 @@ public class PullAPIWrapper { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } - public PullResult pullKernelImpl( - final MessageQueue mq, - final String subExpression, - final long subVersion, - final long offset, - final int maxNums, - final int sysFlag, - final long commitOffset, - final long brokerSuspendMaxTimeMillis, - final long timeoutMillis, - final CommunicationMode communicationMode, - final PullCallback pullCallback - ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return pullKernelImpl( - mq, - subExpression, - ExpressionType.TAG, - subVersion, offset, - maxNums, - sysFlag, - commitOffset, - brokerSuspendMaxTimeMillis, - timeoutMillis, - communicationMode, - pullCallback - ); - } - public long recalculatePullFromWhichNode(final MessageQueue mq) { if (this.isConnectBrokerByUser()) { return this.defaultBrokerId; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 9ffaed0a..51722111 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1046,6 +1046,19 @@ public class MQClientInstance { if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) { return this.brokerVersionTable.get(brokerName).get(brokerAddr); } + }else{ + HeartbeatData heartbeatData = prepareHeartbeatData(); + try { + int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000); + return version; + } catch (Exception e) { + if (this.isBrokerInNameServer(brokerAddr)) { + log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr); + } else { + log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName, + brokerAddr); + } + } } return 0; } -- GitLab