From 18e44c2bbc11c60e451e548a90dea1bba590497e Mon Sep 17 00:00:00 2001 From: qqeasonchen Date: Sat, 12 Oct 2019 16:53:54 +0800 Subject: [PATCH] remove reply interface in consumer --- .../consumer/DefaultLitePullConsumer.java | 51 ------- .../consumer/DefaultMQPullConsumer.java | 49 ------- .../consumer/DefaultMQPushConsumer.java | 49 ------- .../consumer/DefaultLitePullConsumerImpl.java | 25 ---- .../consumer/DefaultMQPullConsumerImpl.java | 24 ---- .../consumer/DefaultMQPushConsumerImpl.java | 29 +--- .../client/impl/consumer/MQConsumerInner.java | 15 -- .../consumer/DefaultMQPushConsumerTest.java | 128 ------------------ .../example/rpc/ResponseConsumer.java | 19 ++- 9 files changed, 19 insertions(+), 370 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 460558e4..99976d55 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -22,18 +22,13 @@ import java.util.List; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.store.OffsetStore; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.exception.RemotingException; public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { @@ -416,50 +411,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) { this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis; } - - /** - * send a reply message to the producer of the original request message - * @param requestMsg original request message - * @param replyContent contents of reply message - * @param timeoutMillis - * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public SendResult reply(final Message requestMsg, final byte[] replyContent, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - return this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, timeoutMillis); - } - - /** - * send a reply message to the producer of the original request message asynchronously - * @param requestMsg original request message - * @param replyContent contents of reply message - * @param replyCallback callback to execute on replying completion. - * @param timeoutMillis - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis); - } - - /** - * send a reply message to the producer of the original request message oneway - * @param requestMsg original request message - * @param replyContent contents of reply message - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public void replyOneway(final Message requestMsg, - final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - this.defaultLitePullConsumerImpl.replyOneway(requestMsg, replyContent); - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 9e291b56..0876a94e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -25,10 +25,7 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -451,50 +448,4 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume public void setMaxReconsumeTimes(final int maxReconsumeTimes) { this.maxReconsumeTimes = maxReconsumeTimes; } - - /** - * send a reply message to the producer of the original request message - * @param requestMsg original request message - * @param replyContent contents of reply message - * @param timeoutMillis - * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public SendResult reply(final Message requestMsg, final byte[] replyContent, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - return this.defaultMQPullConsumerImpl.reply(requestMsg, replyContent, timeoutMillis); - } - - /** - * send a reply message to the producer of the original request message asynchronously - * @param requestMsg original request message - * @param replyContent contents of reply message - * @param replyCallback callback to execute on replying completion. - * @param timeoutMillis - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - this.defaultMQPullConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis); - } - - /** - * send a reply message to the producer of the original request message oneway - * @param requestMsg original request message - * @param replyContent contents of reply message - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public void replyOneway(final Message requestMsg, - final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - this.defaultMQPullConsumerImpl.replyOneway(requestMsg, replyContent); - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 8aa12a49..339f799f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -30,15 +30,12 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -892,50 +889,4 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } - - /** - * send a reply message to the producer of the original request message - * @param requestMsg original request message - * @param replyContent contents of reply message - * @param timeoutMillis - * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public SendResult reply(final Message requestMsg, final byte[] replyContent, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - return this.defaultMQPushConsumerImpl.reply(requestMsg, replyContent, timeoutMillis); - } - - /** - * send a reply message to the producer of the original request message asynchronously - * @param requestMsg original request message - * @param replyContent contents of reply message - * @param replyCallback callback to execute on replying completion. - * @param timeoutMillis - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - this.defaultMQPushConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis); - } - - /** - * send a reply message to the producer of the original request message oneway - * @param requestMsg original request message - * @param replyContent contents of reply message - * @throws InterruptedException if the thread is interrupted. - * @throws RemotingException if there is any network-tier error. - * @throws MQClientException if there is any client error. - * @throws MQBrokerException if there is any broker error. - */ - public void replyOneway(final Message requestMsg, - final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - this.defaultMQPushConsumerImpl.replyOneway(requestMsg, replyContent); - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index d094f661..2c673a1b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -53,9 +53,6 @@ import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.utils.MessageUtil; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -63,7 +60,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; @@ -1079,25 +1075,4 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) { this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException; } - - @Override - public SendResult reply(final Message requestMsg, final byte[] replyContent, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis); - } - - @Override - public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis); - } - - @Override - public void replyOneway(final Message requestMsg, - final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage); - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 6c63ff49..afd72a08 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -43,9 +43,6 @@ import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.utils.MessageUtil; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; @@ -804,25 +801,4 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public RebalanceImpl getRebalanceImpl() { return rebalanceImpl; } - - @Override - public SendResult reply(final Message requestMsg, final byte[] replyContent, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis); - } - - @Override - public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis); - } - - @Override - public void replyOneway(final Message requestMsg, - final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage); - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 20c59609..807e9c6d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentMap; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; @@ -50,22 +51,20 @@ import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.stat.ConsumerStatsManager; -import org.apache.rocketmq.client.utils.MessageUtil; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; @@ -76,7 +75,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.PullSysFlag; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -1175,25 +1173,4 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) { this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException; } - - @Override - public SendResult reply(final Message requestMsg, final byte[] replyContent, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis); - } - - @Override - public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis); - } - - @Override - public void replyOneway(final Message requestMsg, - final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException { - Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); - this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage); - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java index 010d1db3..c2e8a1df 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java @@ -17,18 +17,12 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.Set; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.remoting.exception.RemotingException; /** * Consumer inner interface @@ -55,13 +49,4 @@ public interface MQConsumerInner { boolean isUnitMode(); ConsumerRunningInfo consumerRunningInfo(); - - SendResult reply(final Message requestMsg, final byte[] replyContent, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; - - void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback, - long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; - - void replyOneway(final Message requestMsg, - final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException; } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index a2a469eb..e6f0e866 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -19,16 +19,12 @@ package org.apache.rocketmq.client.consumer; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; @@ -37,7 +33,6 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; @@ -51,23 +46,11 @@ import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; -import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageClientExt; -import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Assert; @@ -83,10 +66,8 @@ import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; @@ -142,10 +123,6 @@ public class DefaultMQPushConsumerTest { field.setAccessible(true); field.set(mQClientFactory, mQClientAPIImpl); - field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); - field.setAccessible(true); - field.set(mQClientFactory.getDefaultMQProducer().getDefaultMQProducerImpl(), mQClientFactory); - pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false)); field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper"); field.setAccessible(true); @@ -279,64 +256,6 @@ public class DefaultMQPushConsumerTest { } } - @Test - public void testReplyMessageSync() throws RemotingException, InterruptedException, MQBrokerException, MQClientException, NoSuchFieldException { - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(createSendResult(SendStatus.SEND_OK)); - - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - - SendResult sendResult = pushConsumer.reply(createRequestMessage(), new byte[]{'a'}, 3000); - assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); - assertThat(sendResult.getOffsetMsgId()).isEqualTo("12345"); - assertThat(sendResult.getQueueOffset()).isEqualTo(456L); - } - - @Test - public void testReplyMessageASync() throws RemotingException, InterruptedException, MQBrokerException, MQClientException, NoSuchFieldException { - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { - CommunicationMode communicationMode = mock.getArgument(5); - SendCallback sendCallback = mock.getArgument(6); - SendResult sendResult = createSendResult(SendStatus.SEND_OK); - switch (communicationMode) { - case SYNC: - return sendCallback; - case ASYNC: - case ONEWAY: - if (sendCallback != null) { - sendCallback.onSuccess(sendResult); - } - } - return null; - } - }); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicBoolean replySuccess = new AtomicBoolean(false); - pushConsumer.reply(createRequestMessage(), new byte[] {'a'}, new SendCallback() { - @Override public void onSuccess(SendResult sendResult) { - assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); - assertThat(sendResult.getOffsetMsgId()).isEqualTo("12345"); - assertThat(sendResult.getQueueOffset()).isEqualTo(456L); - countDownLatch.countDown(); - replySuccess.set(true); - } - - @Override public void onException(Throwable e) { - countDownLatch.countDown(); - } - }, 3000); - countDownLatch.await(3000, TimeUnit.MILLISECONDS); - assertTrue(replySuccess.get()); - } - private DefaultMQPushConsumer createPushConsumer() { DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @@ -375,51 +294,4 @@ public class DefaultMQPushConsumerTest { } return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); } - - private SendResult createSendResult(SendStatus sendStatus) { - SendResult sendResult = new SendResult(); - sendResult.setMsgId("12345"); - sendResult.setOffsetMsgId("12345"); - sendResult.setQueueOffset(456); - sendResult.setSendStatus(sendStatus); - sendResult.setRegionId("HZ"); - return sendResult; - } - - public static TopicRouteData createTopicRoute() { - TopicRouteData topicRouteData = new TopicRouteData(); - - topicRouteData.setFilterServerTable(new HashMap>()); - List brokerDataList = new ArrayList(); - BrokerData brokerData = new BrokerData(); - brokerData.setBrokerName("BrokerA"); - brokerData.setCluster("DefaultCluster"); - HashMap brokerAddrs = new HashMap(); - brokerAddrs.put(0L, "127.0.0.1:10911"); - brokerData.setBrokerAddrs(brokerAddrs); - brokerDataList.add(brokerData); - topicRouteData.setBrokerDatas(brokerDataList); - - List queueDataList = new ArrayList(); - QueueData queueData = new QueueData(); - queueData.setBrokerName("BrokerA"); - queueData.setPerm(6); - queueData.setReadQueueNums(3); - queueData.setWriteQueueNums(4); - queueData.setTopicSynFlag(0); - queueDataList.add(queueData); - topicRouteData.setQueueDatas(queueDataList); - - return topicRouteData; - } - - private Message createRequestMessage() { - Message requestMessage = new Message(); - Map map = new HashMap(); - map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO, "reply_to_who"); - map.put(MessageConst.PROPERTY_CLUSTER, "DefaultCluster"); - map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000"); - MessageAccessor.setProperties(requestMessage, map); - return requestMessage; - } } diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java index f3b3fd11..39012c8b 100644 --- a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java @@ -24,20 +24,30 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.utils.MessageUtil; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.exception.RemotingException; public class ResponseConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { + String producerGroup = "please_rename_unique_group_name"; String consumerGroup = "please_rename_unique_group_name"; String topic = "RequestTopic"; + // create a producer to send reply message + DefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup); + replyProducer.start(); + + // create consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); - //recommend client configs + + // recommend client configs consumer.setPullTimeDelayMillsWhenException(0L); consumer.registerMessageListener(new MessageListenerConcurrently() { @@ -48,9 +58,12 @@ public class ResponseConsumer { try { System.out.printf("handle message: %s", msg.toString()); String replyTo = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); - byte[] replyContent = "reply message contents.".getBytes(); - SendResult replyResult = consumer.reply(msg, replyContent, 3000); + // create reply message with given util, do not create reply message by yourself + Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent); + + // send reply message with producer + SendResult replyResult = replyProducer.send(replyMessage, 3000); System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString()); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); -- GitLab