提交 cc7543ff 编写于 作者: Q qqeasonchen

add reply interface to DefaultLitePullConsumer

上级 c434ff43
......@@ -22,13 +22,18 @@ import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
......@@ -411,4 +416,50 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) {
this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
}
/**
* send a reply message to the producer of the original request message
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param timeoutMillis
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message asynchronously
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param replyCallback callback to execute on replying completion.
* @param timeoutMillis
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message oneway
* @param requestMsg original request message
* @param replyContent contents of reply message
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultLitePullConsumerImpl.replyOneway(requestMsg, replyContent);
}
}
......@@ -53,6 +53,9 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
......@@ -60,6 +63,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
......@@ -1070,4 +1074,25 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
@Override
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis);
}
@Override
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis);
}
@Override
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage);
}
}
......@@ -45,7 +45,7 @@ public class MessageUtilsTest {
Message msg = MessageUtil.createReplyMessage(createReplyMessage(null), new byte[] {'a'});
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("create reply message fail.");
assertThat(e).hasMessageContaining("create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册