From cc7543ff756c535df0e9d1b395563a2be1e4d1ef Mon Sep 17 00:00:00 2001
From: qqeasonchen <qqeasonchen@gmail.com>
Date: Tue, 24 Sep 2019 21:28:49 +0800
Subject: [PATCH] add reply interface to DefaultLitePullConsumer

---
 .../consumer/DefaultLitePullConsumer.java     | 51 +++++++++++++++++++
 .../consumer/DefaultLitePullConsumerImpl.java | 25 +++++++++
 .../client/utils/MessageUtilsTest.java        |  2 +-
 3 files changed, 77 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 99976d55..460558e4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -22,13 +22,18 @@ import java.util.List;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 
 public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
 
@@ -411,4 +416,50 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) {
         this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
     }
+
+    /**
+     *  send a reply message to the producer of the original request message
+     * @param requestMsg original request message
+     * @param replyContent contents of reply message
+     * @param timeoutMillis
+     * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException if there is any client error.
+     * @throws MQBrokerException if there is any broker error.
+     */
+    public SendResult reply(final Message requestMsg, final byte[] replyContent,
+        long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        return this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, timeoutMillis);
+    }
+
+    /**
+     * send a reply message to the producer of the original request message asynchronously
+     * @param requestMsg original request message
+     * @param replyContent contents of reply message
+     * @param replyCallback  callback to execute on replying completion.
+     * @param timeoutMillis
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException if there is any client error.
+     * @throws MQBrokerException if there is any broker error.
+     */
+    public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback,
+        long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis);
+    }
+
+    /**
+     * send a reply message to the producer of the original request message oneway
+     * @param requestMsg original request message
+     * @param replyContent contents of reply message
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException if there is any client error.
+     * @throws MQBrokerException if there is any broker error.
+     */
+    public void replyOneway(final Message requestMsg,
+        final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        this.defaultLitePullConsumerImpl.replyOneway(requestMsg, replyContent);
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index a37c3a01..9308fad7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -53,6 +53,9 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.utils.MessageUtil;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -60,6 +63,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
@@ -1070,4 +1074,25 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         }
 
     }
+
+    @Override
+    public SendResult reply(final Message requestMsg, final byte[] replyContent,
+        long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
+        return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis);
+    }
+
+    @Override
+    public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
+        long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
+        this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis);
+    }
+
+    @Override
+    public void replyOneway(final Message requestMsg,
+        final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException {
+        Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
+        this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage);
+    }
 }
diff --git a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
index a722fa6c..406979c9 100644
--- a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
@@ -45,7 +45,7 @@ public class MessageUtilsTest {
             Message msg = MessageUtil.createReplyMessage(createReplyMessage(null), new byte[] {'a'});
             failBecauseExceptionWasNotThrown(MQClientException.class);
         } catch (MQClientException e) {
-            assertThat(e).hasMessageContaining("create reply message fail.");
+            assertThat(e).hasMessageContaining("create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
         }
     }
 
-- 
GitLab