diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 99976d55d3ad90bd5514008b201482ca726458fd..460558e40758c3749bd8d962653b75514e8f9718 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -22,13 +22,18 @@ import java.util.List; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { @@ -411,4 +416,50 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) { this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis; } + + /** + * send a reply message to the producer of the original request message + * @param requestMsg original request message + * @param replyContent contents of reply message + * @param timeoutMillis + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * @throws InterruptedException if the thread is interrupted. + * @throws RemotingException if there is any network-tier error. + * @throws MQClientException if there is any client error. + * @throws MQBrokerException if there is any broker error. + */ + public SendResult reply(final Message requestMsg, final byte[] replyContent, + long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + return this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, timeoutMillis); + } + + /** + * send a reply message to the producer of the original request message asynchronously + * @param requestMsg original request message + * @param replyContent contents of reply message + * @param replyCallback callback to execute on replying completion. + * @param timeoutMillis + * @throws InterruptedException if the thread is interrupted. + * @throws RemotingException if there is any network-tier error. + * @throws MQClientException if there is any client error. + * @throws MQBrokerException if there is any broker error. + */ + public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback, + long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis); + } + + /** + * send a reply message to the producer of the original request message oneway + * @param requestMsg original request message + * @param replyContent contents of reply message + * @throws InterruptedException if the thread is interrupted. + * @throws RemotingException if there is any network-tier error. + * @throws MQClientException if there is any client error. + * @throws MQBrokerException if there is any broker error. + */ + public void replyOneway(final Message requestMsg, + final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + this.defaultLitePullConsumerImpl.replyOneway(requestMsg, replyContent); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index a37c3a0149047a7313e9cc2f3a92e3b4bc0a07c4..9308fad7c755301b3e6d06f5d5a306acb00d3b8a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -53,6 +53,9 @@ import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.utils.MessageUtil; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -60,6 +63,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; @@ -1070,4 +1074,25 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } + + @Override + public SendResult reply(final Message requestMsg, final byte[] replyContent, + long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); + return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis); + } + + @Override + public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback, + long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); + this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis); + } + + @Override + public void replyOneway(final Message requestMsg, + final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException { + Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent); + this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage); + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java index a722fa6cbb52b1b7b509140095a6461669cbecb3..406979c9f83c8a3f958fc8668a9e8c327ac99c2c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java @@ -45,7 +45,7 @@ public class MessageUtilsTest { Message msg = MessageUtil.createReplyMessage(createReplyMessage(null), new byte[] {'a'}); failBecauseExceptionWasNotThrown(MQClientException.class); } catch (MQClientException e) { - assertThat(e).hasMessageContaining("create reply message fail."); + assertThat(e).hasMessageContaining("create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null."); } }