提交 18e44c2b 编写于 作者: Q qqeasonchen

remove reply interface in consumer

上级 8353551d
......@@ -22,18 +22,13 @@ import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
......@@ -416,50 +411,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) {
this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
}
/**
* send a reply message to the producer of the original request message
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param timeoutMillis
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message asynchronously
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param replyCallback callback to execute on replying completion.
* @param timeoutMillis
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultLitePullConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message oneway
* @param requestMsg original request message
* @param replyContent contents of reply message
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultLitePullConsumerImpl.replyOneway(requestMsg, replyContent);
}
}
......@@ -25,10 +25,7 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
......@@ -451,50 +448,4 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
/**
* send a reply message to the producer of the original request message
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param timeoutMillis
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.defaultMQPullConsumerImpl.reply(requestMsg, replyContent, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message asynchronously
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param replyCallback callback to execute on replying completion.
* @param timeoutMillis
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultMQPullConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message oneway
* @param requestMsg original request message
* @param replyContent contents of reply message
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultMQPullConsumerImpl.replyOneway(requestMsg, replyContent);
}
}
......@@ -30,15 +30,12 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
......@@ -892,50 +889,4 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
/**
* send a reply message to the producer of the original request message
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param timeoutMillis
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.defaultMQPushConsumerImpl.reply(requestMsg, replyContent, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message asynchronously
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param replyCallback callback to execute on replying completion.
* @param timeoutMillis
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultMQPushConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message oneway
* @param requestMsg original request message
* @param replyContent contents of reply message
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultMQPushConsumerImpl.replyOneway(requestMsg, replyContent);
}
}
......@@ -53,9 +53,6 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
......@@ -63,7 +60,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
......@@ -1079,25 +1075,4 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
@Override
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis);
}
@Override
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis);
}
@Override
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage);
}
}
......@@ -43,9 +43,6 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
......@@ -804,25 +801,4 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public RebalanceImpl getRebalanceImpl() {
return rebalanceImpl;
}
@Override
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis);
}
@Override
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis);
}
@Override
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage);
}
}
......@@ -27,6 +27,7 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
......@@ -50,22 +51,20 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
......@@ -76,7 +75,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -1175,25 +1173,4 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
@Override
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis);
}
@Override
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis);
}
@Override
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage);
}
}
......@@ -17,18 +17,12 @@
package org.apache.rocketmq.client.impl.consumer;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Consumer inner interface
......@@ -55,13 +49,4 @@ public interface MQConsumerInner {
boolean isUnitMode();
ConsumerRunningInfo consumerRunningInfo();
SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
void replyOneway(final Message requestMsg,
final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException;
}
......@@ -19,16 +19,12 @@ package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
......@@ -37,7 +33,6 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
......@@ -51,23 +46,11 @@ import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Assert;
......@@ -83,10 +66,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
......@@ -142,10 +123,6 @@ public class DefaultMQPushConsumerTest {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(mQClientFactory.getDefaultMQProducer().getDefaultMQProducerImpl(), mQClientFactory);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
......@@ -279,64 +256,6 @@ public class DefaultMQPushConsumerTest {
}
}
@Test
public void testReplyMessageSync() throws RemotingException, InterruptedException, MQBrokerException, MQClientException, NoSuchFieldException {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendResult sendResult = pushConsumer.reply(createRequestMessage(), new byte[]{'a'}, 3000);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("12345");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Test
public void testReplyMessageASync() throws RemotingException, InterruptedException, MQBrokerException, MQClientException, NoSuchFieldException {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenAnswer(new Answer<Object>() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
CommunicationMode communicationMode = mock.getArgument(5);
SendCallback sendCallback = mock.getArgument(6);
SendResult sendResult = createSendResult(SendStatus.SEND_OK);
switch (communicationMode) {
case SYNC:
return sendCallback;
case ASYNC:
case ONEWAY:
if (sendCallback != null) {
sendCallback.onSuccess(sendResult);
}
}
return null;
}
});
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean replySuccess = new AtomicBoolean(false);
pushConsumer.reply(createRequestMessage(), new byte[] {'a'}, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("12345");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
countDownLatch.countDown();
replySuccess.set(true);
}
@Override public void onException(Throwable e) {
countDownLatch.countDown();
}
}, 3000);
countDownLatch.await(3000, TimeUnit.MILLISECONDS);
assertTrue(replySuccess.get());
}
private DefaultMQPushConsumer createPushConsumer() {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
......@@ -375,51 +294,4 @@ public class DefaultMQPushConsumerTest {
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("12345");
sendResult.setOffsetMsgId("12345");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private Message createRequestMessage() {
Message requestMessage = new Message();
Map map = new HashMap<String, String>();
map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO, "reply_to_who");
map.put(MessageConst.PROPERTY_CLUSTER, "DefaultCluster");
map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000");
MessageAccessor.setProperties(requestMessage, map);
return requestMessage;
}
}
......@@ -24,20 +24,30 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class ResponseConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
String producerGroup = "please_rename_unique_group_name";
String consumerGroup = "please_rename_unique_group_name";
String topic = "RequestTopic";
// create a producer to send reply message
DefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);
replyProducer.start();
// create consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//recommend client configs
// recommend client configs
consumer.setPullTimeDelayMillsWhenException(0L);
consumer.registerMessageListener(new MessageListenerConcurrently() {
......@@ -48,9 +58,12 @@ public class ResponseConsumer {
try {
System.out.printf("handle message: %s", msg.toString());
String replyTo = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO);
byte[] replyContent = "reply message contents.".getBytes();
SendResult replyResult = consumer.reply(msg, replyContent, 3000);
// create reply message with given util, do not create reply message by yourself
Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);
// send reply message with producer
SendResult replyResult = replyProducer.send(replyMessage, 3000);
System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册