提交 28936fa1 编写于 作者: Q qqeasonchen

add reply interface to consumer

上级 ec1f9266
...@@ -25,7 +25,10 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore; ...@@ -25,7 +25,10 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
...@@ -444,4 +447,50 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume ...@@ -444,4 +447,50 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
public void setMaxReconsumeTimes(final int maxReconsumeTimes) { public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes; this.maxReconsumeTimes = maxReconsumeTimes;
} }
/**
* send a reply message to the producer of the original request message
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param timeoutMillis
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.defaultMQPullConsumerImpl.reply(requestMsg, replyContent, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message asynchronously
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param replyCallback callback to execute on replying completion.
* @param timeoutMillis
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultMQPullConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message oneway
* @param requestMsg original request message
* @param replyContent contents of reply message
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultMQPullConsumerImpl.replyOneway(requestMsg, replyContent);
}
} }
...@@ -30,12 +30,15 @@ import org.apache.rocketmq.client.exception.MQBrokerException; ...@@ -30,12 +30,15 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
...@@ -889,4 +892,50 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -889,4 +892,50 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public TraceDispatcher getTraceDispatcher() { public TraceDispatcher getTraceDispatcher() {
return traceDispatcher; return traceDispatcher;
} }
/**
* send a reply message to the producer of the original request message
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param timeoutMillis
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.defaultMQPushConsumerImpl.reply(requestMsg, replyContent, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message asynchronously
* @param requestMsg original request message
* @param replyContent contents of reply message
* @param replyCallback callback to execute on replying completion.
* @param timeoutMillis
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback replyCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultMQPushConsumerImpl.reply(requestMsg, replyContent, replyCallback, timeoutMillis);
}
/**
* send a reply message to the producer of the original request message oneway
* @param requestMsg original request message
* @param replyContent contents of reply message
* @throws InterruptedException if the thread is interrupted.
* @throws RemotingException if there is any network-tier error.
* @throws MQClientException if there is any client error.
* @throws MQBrokerException if there is any broker error.
*/
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.defaultMQPushConsumerImpl.replyOneway(requestMsg, replyContent);
}
} }
...@@ -268,13 +268,14 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { ...@@ -268,13 +268,14 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
log.debug("receive reply message :{}", msg); log.debug("receive reply message :{}", msg);
processReplyMessage(msg); processReplyMessage(msg);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} catch (Exception e) { } catch (Exception e) {
log.warn("unknown err when receiveRRReplyMsg", e); log.warn("unknown err when receiveReplyMsg", e);
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("process reply message fail"); response.setRemark("process reply message fail");
} }
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response; return response;
} }
......
...@@ -43,6 +43,9 @@ import org.apache.rocketmq.client.impl.CommunicationMode; ...@@ -43,6 +43,9 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
...@@ -796,4 +799,25 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -796,4 +799,25 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public RebalanceImpl getRebalanceImpl() { public RebalanceImpl getRebalanceImpl() {
return rebalanceImpl; return rebalanceImpl;
} }
@Override
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis);
}
@Override
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis);
}
@Override
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage);
}
} }
...@@ -27,7 +27,6 @@ import java.util.Map.Entry; ...@@ -27,7 +27,6 @@ import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
...@@ -51,20 +50,22 @@ import org.apache.rocketmq.client.impl.CommunicationMode; ...@@ -51,20 +50,22 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.stat.ConsumerStatsManager; import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
...@@ -75,6 +76,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; ...@@ -75,6 +76,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
...@@ -1168,4 +1170,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1168,4 +1170,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.consumeMessageService = consumeMessageService; this.consumeMessageService = consumeMessageService;
} }
@Override
public SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
return this.mQClientFactory.getDefaultMQProducer().send(replyMessage, timeoutMillis);
}
@Override
public void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().send(replyMessage, sendCallback, timeoutMillis);
}
@Override
public void replyOneway(final Message requestMsg,
final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException {
Message replyMessage = MessageUtil.createReplyMessage(requestMsg, replyContent);
this.mQClientFactory.getDefaultMQProducer().sendOneway(replyMessage);
}
} }
...@@ -17,12 +17,18 @@ ...@@ -17,12 +17,18 @@
package org.apache.rocketmq.client.impl.consumer; package org.apache.rocketmq.client.impl.consumer;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.exception.RemotingException;
/** /**
* Consumer inner interface * Consumer inner interface
...@@ -49,4 +55,13 @@ public interface MQConsumerInner { ...@@ -49,4 +55,13 @@ public interface MQConsumerInner {
boolean isUnitMode(); boolean isUnitMode();
ConsumerRunningInfo consumerRunningInfo(); ConsumerRunningInfo consumerRunningInfo();
SendResult reply(final Message requestMsg, final byte[] replyContent,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
void reply(final Message requestMsg, final byte[] replyContent, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
void replyOneway(final Message requestMsg,
final byte[] replyContent) throws RemotingException, MQClientException, InterruptedException;
} }
...@@ -1334,6 +1334,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1334,6 +1334,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public Message request(Message msg, public Message request(Message msg,
long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException { long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
...@@ -1341,6 +1342,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1341,6 +1342,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
...@@ -1353,11 +1355,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1353,11 +1355,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestResponseFuture.putResponseMessage(null); requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e); requestResponseFuture.setCause(e);
} }
}, timeout); }, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout); Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) { if (responseMessage == null) {
if (requestResponseFuture.isSendReqeustOk()) { if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else { } else {
...@@ -1371,6 +1373,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1371,6 +1373,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
public void request(Message msg, final RequestCallback requestCallback, long timeout) throws RemotingException { public void request(Message msg, final RequestCallback requestCallback, long timeout) throws RemotingException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
...@@ -1378,6 +1381,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1378,6 +1381,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
...@@ -1389,7 +1393,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1389,7 +1393,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestResponseFuture.setCause(e); requestResponseFuture.setCause(e);
requestFail(requestUniqId); requestFail(requestUniqId);
} }
}, timeout); }, timeout - cost);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex); log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex); throw new RemotingSendRequestException(msg.getTopic(), ex);
...@@ -1399,6 +1403,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1399,6 +1403,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException, RequestTimeoutException { InterruptedException, RequestTimeoutException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
...@@ -1406,6 +1411,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1406,6 +1411,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
...@@ -1418,11 +1424,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1418,11 +1424,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestResponseFuture.putResponseMessage(null); requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e); requestResponseFuture.setCause(e);
} }
}, timeout); }, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout); Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) { if (responseMessage == null) {
if (requestResponseFuture.isSendReqeustOk()) { if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else { } else {
...@@ -1437,6 +1443,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1437,6 +1443,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void request(final Message msg, final MessageQueueSelector selector, final Object arg, public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout) throws RemotingException { final RequestCallback requestCallback, final long timeout) throws RemotingException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
...@@ -1444,6 +1451,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1444,6 +1451,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
...@@ -1455,7 +1463,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1455,7 +1463,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestResponseFuture.setCause(e); requestResponseFuture.setCause(e);
requestFail(requestUniqId); requestFail(requestUniqId);
} }
}, timeout); }, timeout - cost);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex); log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex); throw new RemotingSendRequestException(msg.getTopic(), ex);
...@@ -1464,6 +1472,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1464,6 +1472,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public Message request(final Message msg, final MessageQueue mq, final long timeout) public Message request(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
...@@ -1471,6 +1480,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1471,6 +1480,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
...@@ -1483,11 +1493,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1483,11 +1493,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestResponseFuture.putResponseMessage(null); requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e); requestResponseFuture.setCause(e);
} }
}, null, timeout); }, null, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout); Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) { if (responseMessage == null) {
if (requestResponseFuture.isSendReqeustOk()) { if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else { } else {
...@@ -1502,6 +1512,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1502,6 +1512,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RemotingException { throws RemotingException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
...@@ -1509,6 +1520,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1509,6 +1520,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
...@@ -1520,7 +1532,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1520,7 +1532,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestResponseFuture.setCause(e); requestResponseFuture.setCause(e);
requestFail(requestUniqId); requestFail(requestUniqId);
} }
}, null, timeout); }, null, timeout - cost);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex); log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex); throw new RemotingSendRequestException(msg.getTopic(), ex);
...@@ -1559,20 +1571,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1559,20 +1571,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
} }
private SendResult reply(final Message msg,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeoutMillis);
}
private SendResult reply(final Message msg, final SendCallback sendCallback,
long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeoutMillis);
}
public void replyOneway(final Message msg) throws RemotingException, MQClientException, InterruptedException {
this.sendOneway(msg);
}
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() { public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable; return topicPublishInfoTable;
} }
......
...@@ -566,13 +566,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -566,13 +566,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} }
/** /**
* @param msg * Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p>
* @param timeout *
* @return * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* @throws MQClientException * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
* @throws RemotingException * delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
* @throws MQBrokerException *
* @throws InterruptedException * @param msg request message to send
* @param timeout request timeout
* @return reply message
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
*/ */
@Override @Override
public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
...@@ -582,85 +588,101 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -582,85 +588,101 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} }
/** /**
* @param msg * Request asynchronously. </p>
* @param requestCallback * This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p>
* @param timeout *
* @return * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
* @throws MQClientException * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
* @throws RemotingException * application developers are the one to resolve this potential issue.
* @throws InterruptedException *
* @param msg request message to send
* @param requestCallback callback to execute on request completion.
* @param timeout request timeout
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
*/ */
@Override @Override
public void request(final Message msg, final RequestCallback requestCallback, final long timeout) public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException, RequestTimeoutException { throws MQClientException, RemotingException, InterruptedException, MQBrokerException, RequestTimeoutException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, requestCallback, timeout); this.defaultMQProducerImpl.request(msg, requestCallback, timeout);
} }
/** /**
* @param msg * Same to {@link #request(Message, long)} with message queue selector specified.
* @param selector *
* @param arg * @param msg request message to send
* @param timeout * @param selector message queue selector, through which we get target message queue to deliver message to.
* @return * @param arg argument to work along with message queue selector.
* @throws MQClientException * @param timeout timeout of request.
* @throws RemotingException * @return reply message
* @throws MQBrokerException * @throws MQClientException if there is any client error.
* @throws InterruptedException * @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
*/ */
@Override @Override
public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException, RequestTimeoutException { InterruptedException, RequestTimeoutException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.request(msg, selector, arg, timeout); return this.defaultMQProducerImpl.request(msg, selector, arg, timeout);
} }
/** /**
* @param msg * Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
* @param selector *
* @param arg * @param msg requst message to send
* @param requestCallback * @param selector message queue selector, through which we get target message queue to deliver message to.
* @param timeout * @param arg argument to work along with message queue selector.
* @return * @param requestCallback callback to execute on request completion.
* @throws MQClientException * @param timeout timeout of request.
* @throws RemotingException * @throws MQClientException if there is any client error.
* @throws InterruptedException * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
*/ */
@Override @Override
public void request(final Message msg, final MessageQueueSelector selector, final Object arg, public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException, final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException,
InterruptedException { InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout); this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout);
} }
/** /**
* @param msg * Same to {@link #request(Message, long)} with target message queue specified in addition.
* @param mq *
* @param timeout * @param msg request message to send
* @return * @param mq target message queue.
* @throws MQClientException * @param timeout request timeout
* @throws RemotingException * @throws MQClientException if there is any client error.
* @throws MQBrokerException * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException * @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
*/ */
@Override @Override
public Message request(final Message msg, final MessageQueue mq, final long timeout) public Message request(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.request(msg, mq, timeout); return this.defaultMQProducerImpl.request(msg, mq, timeout);
} }
/** /**
* @param msg * Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
* @param mq *
* @param requestCallback * @param msg request message to send
* @param timeout * @param mq target message queue.
* @return * @param requestCallback callback to execute on request completion.
* @throws MQClientException * @param timeout timeout of request.
* @throws RemotingException * @throws MQClientException if there is any client error.
* @throws InterruptedException * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
*/ */
@Override @Override
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout); this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout);
} }
......
...@@ -19,7 +19,6 @@ package org.apache.rocketmq.client.producer; ...@@ -19,7 +19,6 @@ package org.apache.rocketmq.client.producer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
public class RequestResponseFuture { public class RequestResponseFuture {
...@@ -29,9 +28,8 @@ public class RequestResponseFuture { ...@@ -29,9 +28,8 @@ public class RequestResponseFuture {
private final Message requestMsg = null; private final Message requestMsg = null;
private long timeoutMillis; private long timeoutMillis;
private CountDownLatch countDownLatch = new CountDownLatch(1); private CountDownLatch countDownLatch = new CountDownLatch(1);
private AtomicBoolean ececuteCallbackOnlyOnce = new AtomicBoolean(false);
private volatile Message responseMsg = null; private volatile Message responseMsg = null;
private volatile boolean sendReqeustOk = true; private volatile boolean sendRequestOk = true;
private volatile Throwable cause = null; private volatile Throwable cause = null;
public RequestResponseFuture(String requestUniqId, long timeoutMillis, RequestCallback requestCallback) { public RequestResponseFuture(String requestUniqId, long timeoutMillis, RequestCallback requestCallback) {
...@@ -42,7 +40,7 @@ public class RequestResponseFuture { ...@@ -42,7 +40,7 @@ public class RequestResponseFuture {
public void executeRequestCallback() { public void executeRequestCallback() {
if (requestCallback != null) { if (requestCallback != null) {
if (sendReqeustOk && cause == null) { if (sendRequestOk && cause == null) {
requestCallback.onSuccess(responseMsg); requestCallback.onSuccess(responseMsg);
} else { } else {
requestCallback.onException(cause); requestCallback.onException(cause);
...@@ -93,14 +91,6 @@ public class RequestResponseFuture { ...@@ -93,14 +91,6 @@ public class RequestResponseFuture {
this.countDownLatch = countDownLatch; this.countDownLatch = countDownLatch;
} }
public AtomicBoolean getEcecuteCallbackOnlyOnce() {
return ececuteCallbackOnlyOnce;
}
public void setEcecuteCallbackOnlyOnce(AtomicBoolean ececuteCallbackOnlyOnce) {
this.ececuteCallbackOnlyOnce = ececuteCallbackOnlyOnce;
}
public Message getResponseMsg() { public Message getResponseMsg() {
return responseMsg; return responseMsg;
} }
...@@ -109,12 +99,12 @@ public class RequestResponseFuture { ...@@ -109,12 +99,12 @@ public class RequestResponseFuture {
this.responseMsg = responseMsg; this.responseMsg = responseMsg;
} }
public boolean isSendReqeustOk() { public boolean isSendRequestOk() {
return sendReqeustOk; return sendRequestOk;
} }
public void setSendReqeustOk(boolean sendReqeustOk) { public void setSendReqeustOk(boolean sendReqeustOk) {
this.sendReqeustOk = sendReqeustOk; this.sendRequestOk = sendReqeustOk;
} }
public Message getRequestMsg() { public Message getRequestMsg() {
......
...@@ -41,8 +41,10 @@ public class MessageUtil { ...@@ -41,8 +41,10 @@ public class MessageUtil {
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl); MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);
return replyMessage; return replyMessage;
} else {
throw new MQClientException(-1, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
} }
} }
throw new MQClientException(-1, "create reply message fail."); throw new MQClientException(-1, "create reply message fail, requestMessage cannot be null.");
} }
} }
...@@ -18,15 +18,11 @@ ...@@ -18,15 +18,11 @@
package org.apache.rocketmq.example.rpc; package org.apache.rocketmq.example.rpc;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RequestProducer { public class RequestProducer {
private static final InternalLogger log = ClientLogger.getLog();
public static void main(String[] args) throws MQClientException, InterruptedException { public static void main(String[] args) throws MQClientException, InterruptedException {
String producerGroup = "please_rename_unique_group_name"; String producerGroup = "please_rename_unique_group_name";
String topic = "RequestTopic"; String topic = "RequestTopic";
...@@ -43,9 +39,9 @@ public class RequestProducer { ...@@ -43,9 +39,9 @@ public class RequestProducer {
long begin = System.currentTimeMillis(); long begin = System.currentTimeMillis();
Message retMsg = producer.request(msg, ttl); Message retMsg = producer.request(msg, ttl);
long cost = System.currentTimeMillis() - begin; long cost = System.currentTimeMillis() - begin;
System.err.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg); System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
} catch (Exception e) { } catch (Exception e) {
log.warn("", e); e.printStackTrace();
} }
producer.shutdown(); producer.shutdown();
} }
......
...@@ -24,19 +24,13 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; ...@@ -24,19 +24,13 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
public class ResponseConsumer { public class ResponseConsumer {
private static final InternalLogger log = ClientLogger.getLog();
public static void main(String[] args) throws InterruptedException, MQClientException { public static void main(String[] args) throws InterruptedException, MQClientException {
String consumerGroup = "please_rename_unique_group_name"; String consumerGroup = "please_rename_unique_group_name";
String topic = "RequestTopic"; String topic = "RequestTopic";
...@@ -50,23 +44,13 @@ public class ResponseConsumer { ...@@ -50,23 +44,13 @@ public class ResponseConsumer {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) { for (MessageExt msg : msgs) {
try { try {
log.info("handle message: {} body={}", msg, new String(msg.getBody())); System.out.printf("handle message: %s", msg.toString());
String replyTo = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); String replyTo = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO);
//You must use MessageUtil to creage reply message, otherwise reply message maybe wrong.
byte[] replyContent = "reply message contents.".getBytes(); byte[] replyContent = "reply message contents.".getBytes();
Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent); SendResult replyResult = consumer.reply(msg, replyContent, 3000);
System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
//maybe you should create a producer to send reply message. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
SendResult sendResult = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, 3000);
System.out.printf("reply msg %s to %s , %s %n", replyMessage.toString(), replyTo, sendResult.toString());
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册