diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9bc154fd92299f2a1eb471a4478b6495c24a7966..85009d620f573ab97adb149dd417a86237bf3d92 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
+import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -132,6 +133,7 @@ public class BrokerController {
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue sendThreadPoolQueue;
private final BlockingQueue pullThreadPoolQueue;
+ private final BlockingQueue replyThreadPoolQueue;
private final BlockingQueue queryThreadPoolQueue;
private final BlockingQueue clientManagerThreadPoolQueue;
private final BlockingQueue heartbeatThreadPoolQueue;
@@ -147,6 +149,7 @@ public class BrokerController {
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
+ private ExecutorService replyMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
@@ -194,6 +197,7 @@ public class BrokerController {
this.sendThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity());
+ this.replyThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
@@ -277,6 +281,14 @@ public class BrokerController {
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
+ this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+ this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.replyThreadPoolQueue,
+ new ThreadFactoryImpl("ProcessReplyMessageThread_"));
+
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
@@ -553,6 +565,17 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
+ /**
+ * ReplyMessageProcessor
+ */
+ ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
+ replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
+
+ this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
+
/**
* QueryMessageProcessor
*/
@@ -763,6 +786,10 @@ public class BrokerController {
this.pullMessageExecutor.shutdown();
}
+ if (this.replyMessageExecutor != null) {
+ this.replyMessageExecutor.shutdown();
+ }
+
if (this.adminBrokerExecutor != null) {
this.adminBrokerExecutor.shutdown();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index f3eed652ceaede7cb406ff2d17ce386b1ac594bb..12f632b45f4f5a3636f1c8dd0b545716975fe5c6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -17,17 +17,16 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -43,7 +42,9 @@ public class ProducerManager {
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap> groupChannelTable =
new HashMap>();
+ private final ConcurrentHashMap clientChannelTable = new ConcurrentHashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+
public ProducerManager() {
}
@@ -90,6 +91,7 @@ public class ProducerManager {
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
+ clientChannelTable.remove(info.getClientId());
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
@@ -121,6 +123,7 @@ public class ProducerManager {
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
+ clientChannelTable.remove(clientChannelInfo.getClientId());
log.info(
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
@@ -154,6 +157,7 @@ public class ProducerManager {
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+ clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
@@ -179,6 +183,7 @@ public class ProducerManager {
HashMap channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+ clientChannelTable.remove(clientChannelInfo.getClientId());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
@@ -231,4 +236,8 @@ public class ProducerManager {
}
return null;
}
+
+ public Channel findChannel(String clientId) {
+ return clientChannelTable.get(clientId);
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..565857a34779c126154694f0ac6b0881d60d5007
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ public ReplyMessageProcessor(final BrokerController brokerController) {
+ super(brokerController);
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ SendMessageContext mqtraceContext = null;
+ SendMessageRequestHeader requestHeader = parseRequestHeader(request);
+ if (requestHeader == null) {
+ return null;
+ }
+
+ mqtraceContext = buildMsgContext(ctx, requestHeader);
+ this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+
+ RemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);
+
+ this.executeSendMessageHookAfter(response, mqtraceContext);
+ return response;
+ }
+
+ @Override
+ protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException {
+ SendMessageRequestHeaderV2 requestHeaderV2 = null;
+ SendMessageRequestHeader requestHeader = null;
+ switch (request.getCode()) {
+ case RequestCode.SEND_REPLY_MESSAGE_V2:
+ requestHeaderV2 =
+ (SendMessageRequestHeaderV2) request
+ .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+ case RequestCode.SEND_REPLY_MESSAGE:
+ if (null == requestHeaderV2) {
+ requestHeader =
+ (SendMessageRequestHeader) request
+ .decodeCommandCustomHeader(SendMessageRequestHeader.class);
+ } else {
+ requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
+ }
+ default:
+ break;
+ }
+ return requestHeader;
+ }
+
+ private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx,
+ final RemotingCommand request,
+ final SendMessageContext sendMessageContext,
+ final SendMessageRequestHeader requestHeader) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+
+ response.setOpaque(request.getOpaque());
+
+ response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+ response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+
+ log.debug("receive SendReplyMessage request command, {}", request);
+ final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
+ if (this.brokerController.getMessageStore().now() < startTimstamp) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
+ return response;
+ }
+
+ response.setCode(-1);
+ super.msgCheck(ctx, requestHeader, response);
+ if (response.getCode() != -1) {
+ return response;
+ }
+
+ final byte[] body = request.getBody();
+
+ int queueIdInt = requestHeader.getQueueId();
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+
+ if (queueIdInt < 0) {
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+ }
+
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(requestHeader.getTopic());
+ msgInner.setQueueId(queueIdInt);
+ msgInner.setBody(body);
+ msgInner.setFlag(requestHeader.getFlag());
+ MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+ msgInner.setPropertiesString(requestHeader.getProperties());
+ msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
+ msgInner.setBornHost(ctx.channel().remoteAddress());
+ msgInner.setStoreHost(this.getStoreHost());
+ msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+
+ PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);
+ this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);
+
+ if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+ this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
+ }
+
+ return response;
+ }
+
+ private PushReplyResult pushReplyMessage(final ChannelHandlerContext ctx,
+ final SendMessageRequestHeader requestHeader,
+ final Message msg) {
+ ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader();
+ replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString());
+ replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString());
+ replyMessageRequestHeader.setStoreTimestamp(System.currentTimeMillis());
+ replyMessageRequestHeader.setProducerGroup(requestHeader.getProducerGroup());
+ replyMessageRequestHeader.setTopic(requestHeader.getTopic());
+ replyMessageRequestHeader.setDefaultTopic(requestHeader.getDefaultTopic());
+ replyMessageRequestHeader.setDefaultTopicQueueNums(requestHeader.getDefaultTopicQueueNums());
+ replyMessageRequestHeader.setQueueId(requestHeader.getQueueId());
+ replyMessageRequestHeader.setSysFlag(requestHeader.getSysFlag());
+ replyMessageRequestHeader.setBornTimestamp(requestHeader.getBornTimestamp());
+ replyMessageRequestHeader.setFlag(requestHeader.getFlag());
+ replyMessageRequestHeader.setProperties(requestHeader.getProperties());
+ replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes());
+ replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode());
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader);
+ request.setBody(msg.getBody());
+
+ String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+ PushReplyResult pushReplyResult = new PushReplyResult(false);
+
+ if (senderId != null) {
+ Channel channel = this.brokerController.getProducerManager().findChannel(senderId);
+ if (channel != null) {
+ msg.getProperties().put(MessageConst.PROPERTY_PUSH_REPLY_TIME, String.valueOf(System.currentTimeMillis()));
+ replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ try {
+ RemotingCommand pushResponse = this.brokerController.getBroker2Client().callClient(channel, request);
+ assert pushResponse != null;
+ switch (pushResponse.getCode()) {
+ case ResponseCode.SUCCESS: {
+ pushReplyResult.setPushOk(true);
+ break;
+ }
+ default: {
+ pushReplyResult.setPushOk(false);
+ pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
+ log.warn("push reply message to <{}> return fail, response remark: {}", senderId, pushResponse.getRemark());
+ }
+ }
+ } catch (RemotingException | InterruptedException e) {
+ pushReplyResult.setPushOk(false);
+ pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
+ log.warn("push reply message to <{}> fail. {}", senderId, channel, e);
+ }
+ } else {
+ pushReplyResult.setPushOk(false);
+ pushReplyResult.setRemark("push reply message fail, channel of <" + senderId + "> not found.");
+ log.warn(pushReplyResult.getRemark());
+ }
+ } else {
+ log.warn(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + " is null, can not reply message");
+ pushReplyResult.setPushOk(false);
+ pushReplyResult.setRemark("reply message properties[" + MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + "] is null");
+ }
+ return pushReplyResult;
+ }
+
+ private void handlePushReplyResult(PushReplyResult pushReplyResult, final RemotingCommand response,
+ final SendMessageResponseHeader responseHeader, int queueIdInt) {
+
+ if (!pushReplyResult.isPushOk()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(pushReplyResult.getRemark());
+ } else {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ //set to zore to avoid client decoding exception
+ responseHeader.setMsgId("0");
+ responseHeader.setQueueId(queueIdInt);
+ responseHeader.setQueueOffset(0L);
+ }
+ }
+
+ private void handlePutMessageResult(PutMessageResult putMessageResult,
+ final RemotingCommand request, final MessageExt msg,
+ final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext,
+ int queueIdInt) {
+ if (putMessageResult == null) {
+ log.warn("process reply message, store putMessage return null");
+ return;
+ }
+ boolean putOk = false;
+
+ switch (putMessageResult.getPutMessageStatus()) {
+ // Success
+ case PUT_OK:
+ case FLUSH_DISK_TIMEOUT:
+ case FLUSH_SLAVE_TIMEOUT:
+ case SLAVE_NOT_AVAILABLE:
+ putOk = true;
+ break;
+
+ // Failed
+ case CREATE_MAPEDFILE_FAILED:
+ log.info("create mapped file failed, server is busy or broken.");
+ break;
+ case MESSAGE_ILLEGAL:
+ log.info(
+ "the message is illegal, maybe msg properties length limit 32k.");
+ break;
+ case PROPERTIES_SIZE_EXCEEDED:
+ log.info(
+ "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k.");
+ break;
+ case SERVICE_NOT_AVAILABLE:
+ log.info(
+ "service not available now, maybe disk full, maybe your broker machine memory too small.");
+ break;
+ case OS_PAGECACHE_BUSY:
+ log.info("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
+ break;
+ case UNKNOWN_ERROR:
+ log.info("UNKNOWN_ERROR");
+ break;
+ default:
+ log.info("UNKNOWN_ERROR DEFAULT");
+ break;
+ }
+
+ String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+ if (putOk) {
+ this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+ this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
+ putMessageResult.getAppendMessageResult().getWroteBytes());
+ this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+
+ responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
+ responseHeader.setQueueId(queueIdInt);
+ responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+
+ if (hasSendMessageHook()) {
+ sendMessageContext.setMsgId(responseHeader.getMsgId());
+ sendMessageContext.setQueueId(responseHeader.getQueueId());
+ sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+ int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ } else {
+ if (hasSendMessageHook()) {
+ int wroteSize = request.getBody().length;
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ }
+ }
+
+ class PushReplyResult {
+ boolean pushOk;
+ String remark;
+
+ public PushReplyResult(boolean pushOk) {
+ this.pushOk = pushOk;
+ remark = "";
+ }
+
+ public boolean isPushOk() {
+ return pushOk;
+ }
+
+ public void setPushOk(boolean pushOk) {
+ this.pushOk = pushOk;
+ }
+
+ public String getRemark() {
+ return remark;
+ }
+
+ public void setRemark(String remark) {
+ this.remark = remark;
+ }
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 8035ae6f185b5c91db73eb0ceea5e0a26f43b3f3..2589a7547dcf7c3c353f839f090552964198518d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -343,11 +343,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
- msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+ String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
+ MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -536,6 +538,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+ String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
+ MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 8f215cdcb9cde3934b225f1e36e2d638d48dc4fb..cb2901117f83c16480f43e9d715febcbe803cac7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -34,11 +34,11 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
public class TopicConfigManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -134,6 +134,14 @@ public class TopicConfigManager extends ConfigManager {
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
+ {
+ String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
}
public boolean isSystemTopic(final String topic) {
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index 33995ff2fe3fe0f5a38c06ae7147991906c20bed..d9539b6703514bf11997a9f1be9d34add8df1888 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -20,6 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field;
import java.util.HashMap;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,14 +43,14 @@ public class ProducerManagerTest {
@Before
public void init() {
producerManager = new ProducerManager();
- clientInfo = new ClientChannelInfo(channel);
+ clientInfo = new ClientChannelInfo(channel, "clientId", LanguageCode.JAVA, 0);
}
@Test
public void scanNotActiveChannel() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
-
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
field.setAccessible(true);
long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
@@ -57,22 +58,28 @@ public class ProducerManagerTest {
when(channel.close()).thenReturn(mock(ChannelFuture.class));
producerManager.scanNotActiveChannel();
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
+ assertThat(producerManager.findChannel("clientId")).isNull();
}
@Test
public void doChannelCloseEvent() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
producerManager.doChannelCloseEvent("127.0.0.1", channel);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
+ assertThat(producerManager.findChannel("clientId")).isNull();
}
@Test
public void testRegisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
HashMap channelMap = producerManager.getGroupChannelTable().get(group);
+ Channel channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNotNull();
+ assertThat(channel1).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
+ assertThat(channel1).isEqualTo(channel);
}
@Test
@@ -81,10 +88,15 @@ public class ProducerManagerTest {
HashMap channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
-
+ Channel channel1 = producerManager.findChannel("clientId");
+ assertThat(channel1).isNotNull();
+ assertThat(channel1).isEqualTo(channel);
producerManager.unregisterProducer(group, clientInfo);
channelMap = producerManager.getGroupChannelTable().get(group);
+ channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNull();
+ assertThat(channel1).isNull();
+
}
@Test
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..85c775040f53ef6093e979c8c6adc53a3a36161f
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReplyMessageProcessorTest {
+ private ReplyMessageProcessor replyMessageProcessor;
+ @Spy
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ @Mock
+ private ChannelHandlerContext handlerContext;
+ @Mock
+ private MessageStore messageStore;
+ @Mock
+ private Channel channel;
+
+ private String topic = "FooBar";
+ private String group = "FooBarGroup";
+ private ClientChannelInfo clientInfo;
+ @Mock
+ private Broker2Client broker2Client;
+
+ @Before
+ public void init() throws IllegalAccessException, NoSuchFieldException {
+ clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0);
+ brokerController.setMessageStore(messageStore);
+ Field field = BrokerController.class.getDeclaredField("broker2Client");
+ field.setAccessible(true);
+ field.set(brokerController, broker2Client);
+ when(messageStore.now()).thenReturn(System.currentTimeMillis());
+ Channel mockChannel = mock(Channel.class);
+ when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
+ when(handlerContext.channel()).thenReturn(mockChannel);
+ replyMessageProcessor = new ReplyMessageProcessor(brokerController);
+ }
+
+ @Test
+ public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
+ when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ brokerController.getProducerManager().registerProducer(group, clientInfo);
+ final RemotingCommand request = createSendMessageRequestHeaderCommand(RequestCode.SEND_REPLY_MESSAGE);
+ when(brokerController.getBroker2Client().callClient(any(Channel.class), any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS, request));
+ RemotingCommand responseToReturn = replyMessageProcessor.processRequest(handlerContext, request);
+ assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+ }
+
+ private RemotingCommand createSendMessageRequestHeaderCommand(int requestCode) {
+ SendMessageRequestHeader requestHeader = createSendMessageRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
+ request.setBody(new byte[] {'a'});
+ request.makeCustomHeaderToNet();
+ return request;
+ }
+
+ private SendMessageRequestHeader createSendMessageRequestHeader() {
+ SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+ requestHeader.setProducerGroup(group);
+ requestHeader.setTopic(topic);
+ requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
+ requestHeader.setDefaultTopicQueueNums(3);
+ requestHeader.setQueueId(1);
+ requestHeader.setSysFlag(0);
+ requestHeader.setBornTimestamp(System.currentTimeMillis());
+ requestHeader.setFlag(124);
+ requestHeader.setReconsumeTimes(0);
+ Map map = new HashMap();
+ map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1");
+ requestHeader.setProperties(MessageDecoder.messageProperties2String(map));
+ return requestHeader;
+ }
+
+ private RemotingCommand createResponse(int code, RemotingCommand request) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ response.setCode(code);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index c3e4efa252c1c8fb4e3d9b2d0214e2b5b965d736..d0ae5e1b8315370c3ce3373c330e9a7d0df2ca84 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -53,6 +53,7 @@ public class ClientConfig {
* Offset persistent interval for consumer
*/
private int persistConsumerOffsetInterval = 1000 * 5;
+ private long pullTimeDelayMillsWhenException = 1000;
private boolean unitMode = false;
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
@@ -148,6 +149,7 @@ public class ClientConfig {
this.pollNameServerInterval = cc.pollNameServerInterval;
this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
+ this.pullTimeDelayMillsWhenException = cc.pullTimeDelayMillsWhenException;
this.unitMode = cc.unitMode;
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
@@ -165,6 +167,7 @@ public class ClientConfig {
cc.pollNameServerInterval = pollNameServerInterval;
cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+ cc.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
cc.unitMode = unitMode;
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
@@ -222,6 +225,14 @@ public class ClientConfig {
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
}
+ public long getPullTimeDelayMillsWhenException() {
+ return pullTimeDelayMillsWhenException;
+ }
+
+ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+ this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+ }
+
public String getUnitName() {
return unitName;
}
@@ -287,12 +298,13 @@ public class ClientConfig {
this.accessChannel = accessChannel;
}
+
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
- + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
- + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+ + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
index 62a95dfa5a1ee3acc03bc375dec68fbb867c6751..bc03b14df247f79c5e2e5d07136e77935288c9d3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
@@ -23,4 +23,6 @@ public class ClientErrorCode {
public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
public static final int NO_NAME_SERVER_EXCEPTION = 10004;
public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
+ public static final int REQUEST_TIMEOUT_EXCEPTION = 10006;
+ public static final int CREATE_REPLY_MESSAGE_EXCEPTION = 10007;
}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java
new file mode 100644
index 0000000000000000000000000000000000000000..2d756ece617d1da158157ed382bb9ffd799dfc63
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.exception;
+
+import org.apache.rocketmq.common.UtilAll;
+
+public class RequestTimeoutException extends Exception {
+ private static final long serialVersionUID = -5758410930844185841L;
+ private int responseCode;
+ private String errorMessage;
+
+ public RequestTimeoutException(String errorMessage, Throwable cause) {
+ super(errorMessage, cause);
+ this.responseCode = -1;
+ this.errorMessage = errorMessage;
+ }
+
+ public RequestTimeoutException(int responseCode, String errorMessage) {
+ super("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ + errorMessage);
+ this.responseCode = responseCode;
+ this.errorMessage = errorMessage;
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+ public RequestTimeoutException setResponseCode(final int responseCode) {
+ this.responseCode = responseCode;
+ return this;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(final String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index 0bd810a1e1db1b4df45e091052e2aac7394a2130..5861bc4baa18405eded9e4c4da35ad00eb145643 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.client.impl;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-
-import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.RequestFutureTable;
+import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -42,14 +45,16 @@ import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRe
import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
public class ClientRemotingProcessor implements NettyRequestProcessor {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
@@ -76,6 +81,9 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
+
+ case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
+ return this.receiveReplyMessage(ctx, request);
default:
break;
}
@@ -213,4 +221,73 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
return response;
}
+
+ private RemotingCommand receiveReplyMessage(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ long receiveTime = System.currentTimeMillis();
+ ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
+
+ try {
+ MessageExt msg = new MessageExt();
+ msg.setTopic(requestHeader.getTopic());
+ msg.setQueueId(requestHeader.getQueueId());
+ msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
+
+ if (requestHeader.getBornHost() != null) {
+ msg.setBornHost(RemotingUtil.string2SocketAddress(requestHeader.getBornHost()));
+ }
+
+ if (requestHeader.getStoreHost() != null) {
+ msg.setStoreHost(RemotingUtil.string2SocketAddress(requestHeader.getStoreHost()));
+ }
+
+ byte[] body = request.getBody();
+ if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
+ try {
+ body = UtilAll.uncompress(body);
+ } catch (IOException e) {
+ log.warn("err when uncompress constant", e);
+ }
+ }
+ msg.setBody(body);
+ msg.setFlag(requestHeader.getFlag());
+ MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime));
+ msg.setBornTimestamp(requestHeader.getBornTimestamp());
+ msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+ log.debug("receive reply message :{}", msg);
+
+ processReplyMessage(msg);
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } catch (Exception e) {
+ log.warn("unknown err when receiveReplyMsg", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("process reply message fail");
+ }
+ return response;
+ }
+
+ private void processReplyMessage(MessageExt replyMsg) {
+ final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
+ final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);
+ if (requestResponseFuture != null) {
+ requestResponseFuture.putResponseMessage(replyMsg);
+
+ RequestFutureTable.getRequestFutureTable().remove(correlationId);
+
+ if (requestResponseFuture.getRequestCallback() != null) {
+ requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
+ } else {
+ requestResponseFuture.putResponseMessage(replyMsg);
+ }
+ } else {
+ String bornHost = replyMsg.getBornHostString();
+ log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",
+ correlationId, bornHost));
+ }
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b9ace0f4331ccc59a588536f1b00f3632aeae141..1ad5fbfe6fd02ca791bba04d14529fa1375ac1d6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -201,6 +200,8 @@ public class MQClientAPIImpl {
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
+
+ this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
public List getNameServerAddressList() {
@@ -304,8 +305,8 @@ public class MQClientAPIImpl {
requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
- requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),","));
- requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),","));
+ requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ","));
+ requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ","));
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
@@ -344,7 +345,7 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis)
+ public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
@@ -366,7 +367,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
+ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,
+ final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null);
@@ -389,7 +391,7 @@ public class MQClientAPIImpl {
}
throw new MQBrokerException(response.getCode(), response.getRemark());
-
+
}
public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
@@ -445,13 +447,23 @@ public class MQClientAPIImpl {
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
- if (sendSmartMsg || msg instanceof MessageBatch) {
- SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+ String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
+ boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
+ if (isReply) {
+ if (sendSmartMsg) {
+ SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
+ request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
+ } else {
+ request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
+ }
} else {
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+ if (sendSmartMsg || msg instanceof MessageBatch) {
+ SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
+ request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+ } else {
+ request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+ }
}
-
request.setBody(msg.getBody());
switch (communicationMode) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index a37c3a0149047a7313e9cc2f3a92e3b4bc0a07c4..2c673a1b062a7aa181c5d753eaab4a8eb824ef4a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -106,7 +106,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
/**
* Delay some time when exception occur
*/
- private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000;
+ private long pullTimeDelayMillsWhenException = 1000;
/**
* Flow control interval
*/
@@ -156,6 +156,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return new Thread(r, "MonitorMessageQueueChangeThread");
}
});
+ this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
private void checkServiceState() {
@@ -783,7 +784,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Throwable e) {
- pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
+ pullDelayTimeMills = pullTimeDelayMillsWhenException;
log.error("An error occurred in pull message process.", e);
}
@@ -1070,4 +1071,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
+
+ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+ this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index d1b5de15eec1f26de0e4b042c8d55a24c1316006..807e9c6d6fcc67796a8fce76166b42fa615467ec 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -83,7 +83,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
* Delay some time when exception occur
*/
- private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
+ private long pullTimeDelayMillsWhenException = 3000;
/**
* Flow control interval
*/
@@ -115,6 +115,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
+ this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
@@ -222,7 +223,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
@@ -282,7 +283,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest.setNextOffset(offset);
}
} else {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
@@ -290,7 +291,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
@@ -397,7 +398,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
log.warn("execute the pull request exception", e);
}
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
@@ -444,7 +445,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
@@ -1168,4 +1169,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.consumeMessageService = consumeMessageService;
}
+
+ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+ this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 5b7ba9c6f79d24adb5e9e48b7d777e6c2aa0c018..fca50cc565ceb398d66f5c1d28d98467396db221 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -39,6 +41,7 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
@@ -52,6 +55,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.RequestCallback;
+import org.apache.rocketmq.client.producer.RequestFutureTable;
+import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
@@ -79,6 +85,7 @@ import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHe
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.utils.CorrelationIdUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -95,17 +102,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
new ConcurrentHashMap();
private final ArrayList sendMessageHookList = new ArrayList();
private final RPCHook rpcHook;
+ private final BlockingQueue asyncSenderThreadPoolQueue;
+ private final ExecutorService defaultAsyncSenderExecutor;
+ private final Timer timer = new Timer("RequestHouseKeepingService", true);
protected BlockingQueue checkRequestQueue;
protected ExecutorService checkExecutor;
private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private ArrayList checkForbiddenHookList = new ArrayList();
private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
-
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
-
- private final BlockingQueue asyncSenderThreadPoolQueue;
- private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor;
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
@@ -212,6 +218,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+
+ this.timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ RequestFutureTable.scanExpiredRequest();
+ } catch (Throwable e) {
+ log.error("scan RequestFutureTable exception", e);
+ }
+ }
+ }, 1000 * 3, 1000);
}
private void checkConfig() throws MQClientException {
@@ -1325,6 +1342,233 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
+ public Message request(Message msg,
+ long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ try {
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setSendReqeustOk(false);
+ requestResponseFuture.putResponseMessage(null);
+ requestResponseFuture.setCause(e);
+ }
+ }, timeout - cost);
+
+ Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+ if (responseMessage == null) {
+ if (requestResponseFuture.isSendRequestOk()) {
+ throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+ "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+ } else {
+ throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+ }
+ }
+ return responseMessage;
+ } finally {
+ RequestFutureTable.getRequestFutureTable().remove(correlationId);
+ }
+ }
+
+ public void request(Message msg, final RequestCallback requestCallback, long timeout)
+ throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setCause(e);
+ requestFail(correlationId);
+ }
+ }, timeout - cost);
+ }
+
+ public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException, RequestTimeoutException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ try {
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setSendReqeustOk(false);
+ requestResponseFuture.putResponseMessage(null);
+ requestResponseFuture.setCause(e);
+ }
+ }, timeout - cost);
+
+ Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+ if (responseMessage == null) {
+ if (requestResponseFuture.isSendRequestOk()) {
+ throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+ "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+ } else {
+ throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+ }
+ }
+ return responseMessage;
+ } finally {
+ RequestFutureTable.getRequestFutureTable().remove(correlationId);
+ }
+ }
+
+ public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final RequestCallback requestCallback, final long timeout)
+ throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setCause(e);
+ requestFail(correlationId);
+ }
+ }, timeout - cost);
+
+ }
+
+ public Message request(final Message msg, final MessageQueue mq, final long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ try {
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setSendReqeustOk(false);
+ requestResponseFuture.putResponseMessage(null);
+ requestResponseFuture.setCause(e);
+ }
+ }, null, timeout - cost);
+
+ Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+ if (responseMessage == null) {
+ if (requestResponseFuture.isSendRequestOk()) {
+ throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+ "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+ } else {
+ throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+ }
+ }
+ return responseMessage;
+ } finally {
+ RequestFutureTable.getRequestFutureTable().remove(correlationId);
+ }
+ }
+
+ public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+ throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setCause(e);
+ requestFail(correlationId);
+ }
+ }, null, timeout - cost);
+ }
+
+ private void requestFail(final String correlationId) {
+ RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId);
+ if (responseFuture != null) {
+ responseFuture.setSendReqeustOk(false);
+ responseFuture.putResponseMessage(null);
+ try {
+ responseFuture.executeRequestCallback();
+ } catch (Exception e) {
+ log.warn("execute requestCallback in requestFail, and callback throw", e);
+ }
+ }
+ }
+
+ private void prepareSendRequest(final Message msg, long timeout) {
+ String correlationId = CorrelationIdUtil.createCorrelationId();
+ String requestClientId = this.getmQClientFactory().getClientId();
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, requestClientId);
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
+
+ boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
+ if (!hasRouteData) {
+ long beginTimestamp = System.currentTimeMillis();
+ this.tryToFindTopicPublishInfo(msg.getTopic());
+ this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ if (cost > 500) {
+ log.warn("prepare send request for <{}> cost {} ms", msg.getTopic(), cost);
+ }
+ }
+ }
+
public ConcurrentMap getTopicPublishInfoTable() {
return topicPublishInfoTable;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index b4acf8f1c3241ce96a6c0729a00937bf97c54653..faa79f54c93924eb5f6841698c7ad53e98cd670b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
@@ -42,38 +43,29 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
- * This class is the entry point for applications intending to send messages.
- *
+ * This class is the entry point for applications intending to send messages.
*
* It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
- * box for most scenarios.
- *
+ * box for most scenarios.
*
* This class aggregates various send
methods to deliver messages to brokers. Each of them has pros and
- * cons; you'd better understand strengths and weakness of them before actually coding.
- *
+ * cons; you'd better understand strengths and weakness of them before actually coding.
*
- *
- * Thread Safety: After configuring and starting process, this class can be regarded as thread-safe
- * and used among multiple threads context.
- *
+ * Thread Safety: After configuring and starting process, this class can be regarded as thread-safe
+ * and used among multiple threads context.
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {
- private final InternalLogger log = ClientLogger.getLog();
-
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
-
+ private final InternalLogger log = ClientLogger.getLog();
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
- * important when transactional messages are involved.
- *
+ * important when transactional messages are involved.
*
- * For non-transactional messages, it does not matter as long as it's unique per process.
- *
+ * For non-transactional messages, it does not matter as long as it's unique per process.
*
* See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
*/
@@ -100,16 +92,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
- * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
- *
+ * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
- * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
- *
+ * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
@@ -268,14 +258,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
- * Start this producer instance.
- *
+ * Start this producer instance.
*
- *
- * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
- * this method before sending or querying messages.
- *
- *
+ * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
+ * to invoke this method before sending or querying messages.
*
* @throws MQClientException if there is any unexpected error.
*/
@@ -316,8 +302,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
- * Send message in synchronous mode. This method returns only when the sending procedure totally completes.
- *
+ * Send message in synchronous mode. This method returns only when the sending procedure totally completes.
*
* Warn: this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
@@ -359,11 +344,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
- * Send message to broker asynchronously.
- *
+ * Send message to broker asynchronously.
*
- * This method returns immediately. On sending completion, sendCallback
will be executed.
- *
+ * This method returns immediately. On sending completion, sendCallback
will be executed.
*
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
@@ -582,6 +565,133 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
+ /**
+ * Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message.
+ *
+ * Warn: this method has internal retry-mechanism, that is, internal implementation will retry
+ * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
+ * delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
+ *
+ * @param msg request message to send
+ * @param timeout request timeout
+ * @return reply message
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+ public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
+ RemotingException, MQBrokerException, InterruptedException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ return this.defaultMQProducerImpl.request(msg, timeout);
+ }
+
+ /**
+ * Request asynchronously.
+ * This method returns immediately. On receiving reply message, requestCallback
will be executed.
+ *
+ * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
+ * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
+ * application developers are the one to resolve this potential issue.
+ *
+ * @param msg request message to send
+ * @param requestCallback callback to execute on request completion.
+ * @param timeout request timeout
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
+ throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ this.defaultMQProducerImpl.request(msg, requestCallback, timeout);
+ }
+
+ /**
+ * Same to {@link #request(Message, long)} with message queue selector specified.
+ *
+ * @param msg request message to send
+ * @param selector message queue selector, through which we get target message queue to deliver message to.
+ * @param arg argument to work along with message queue selector.
+ * @param timeout timeout of request.
+ * @return reply message
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+ public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException, RequestTimeoutException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ return this.defaultMQProducerImpl.request(msg, selector, arg, timeout);
+ }
+
+ /**
+ * Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
+ *
+ * @param msg requst message to send
+ * @param selector message queue selector, through which we get target message queue to deliver message to.
+ * @param arg argument to work along with message queue selector.
+ * @param requestCallback callback to execute on request completion.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException,
+ InterruptedException, MQBrokerException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout);
+ }
+
+ /**
+ * Same to {@link #request(Message, long)} with target message queue specified in addition.
+ *
+ * @param msg request message to send
+ * @param mq target message queue.
+ * @param timeout request timeout
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+ public Message request(final Message msg, final MessageQueue mq, final long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ return this.defaultMQProducerImpl.request(msg, mq, timeout);
+ }
+
+ /**
+ * Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
+ *
+ * @param msg request message to send
+ * @param mq target message queue.
+ * @param requestCallback callback to execute on request completion.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout);
+ }
+
/**
* Same to {@link #sendOneway(Message)} with message queue selector specified.
*
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 1af6005748992d2334e0833c415500a9fbe96a57..c6cf4c93596578e958de17815fa9604e2c09b7fe 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -98,4 +99,26 @@ public interface MQProducer extends MQAdmin {
SendResult send(final Collection msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+ //for rpc
+ Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+
+ void request(final Message msg, final RequestCallback requestCallback, final long timeout)
+ throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
+
+ Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
+ void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final RequestCallback requestCallback,
+ final long timeout) throws MQClientException, RemotingException,
+ InterruptedException, MQBrokerException;
+
+ Message request(final Message msg, final MessageQueue mq, final long timeout)
+ throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+ void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java
new file mode 100644
index 0000000000000000000000000000000000000000..3107ba57d6eaebeb6c982ba720facb302d0f24b6
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.Message;
+
+public interface RequestCallback {
+ void onSuccess(final Message message);
+
+ void onException(final Throwable e);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
new file mode 100644
index 0000000000000000000000000000000000000000..3d4caa208bd203c8842c389f763181f20f36a0b0
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.logging.InternalLogger;
+
+public class RequestFutureTable {
+ private static InternalLogger log = ClientLogger.getLog();
+ private static ConcurrentHashMap requestFutureTable = new ConcurrentHashMap();
+
+ public static ConcurrentHashMap getRequestFutureTable() {
+ return requestFutureTable;
+ }
+
+ public static void scanExpiredRequest() {
+ final List rfList = new LinkedList();
+ Iterator> it = requestFutureTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry next = it.next();
+ RequestResponseFuture rep = next.getValue();
+
+ if (rep.isTimeout()) {
+ it.remove();
+ rfList.add(rep);
+ log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
+ }
+ }
+
+ for (RequestResponseFuture rf : rfList) {
+ try {
+ Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
+ rf.setCause(cause);
+ rf.executeRequestCallback();
+ } catch (Throwable e) {
+ log.warn("scanResponseTable, operationComplete Exception", e);
+ }
+ }
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
new file mode 100644
index 0000000000000000000000000000000000000000..c54b236b1616fcf9d4f586af8ac2f1e52d09c85f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.message.Message;
+
+public class RequestResponseFuture {
+ private final String correlationId;
+ private final RequestCallback requestCallback;
+ private final long beginTimestamp = System.currentTimeMillis();
+ private final Message requestMsg = null;
+ private long timeoutMillis;
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+ private volatile Message responseMsg = null;
+ private volatile boolean sendRequestOk = true;
+ private volatile Throwable cause = null;
+
+ public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) {
+ this.correlationId = correlationId;
+ this.timeoutMillis = timeoutMillis;
+ this.requestCallback = requestCallback;
+ }
+
+ public void executeRequestCallback() {
+ if (requestCallback != null) {
+ if (sendRequestOk && cause == null) {
+ requestCallback.onSuccess(responseMsg);
+ } else {
+ requestCallback.onException(cause);
+ }
+ }
+ }
+
+ public boolean isTimeout() {
+ long diff = System.currentTimeMillis() - this.beginTimestamp;
+ return diff > this.timeoutMillis;
+ }
+
+ public Message waitResponseMessage(final long timeout) throws InterruptedException {
+ this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ return this.responseMsg;
+ }
+
+ public void putResponseMessage(final Message responseMsg) {
+ this.responseMsg = responseMsg;
+ this.countDownLatch.countDown();
+ }
+
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+ public void setTimeoutMillis(long timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ public RequestCallback getRequestCallback() {
+ return requestCallback;
+ }
+
+ public long getBeginTimestamp() {
+ return beginTimestamp;
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ public Message getResponseMsg() {
+ return responseMsg;
+ }
+
+ public void setResponseMsg(Message responseMsg) {
+ this.responseMsg = responseMsg;
+ }
+
+ public boolean isSendRequestOk() {
+ return sendRequestOk;
+ }
+
+ public void setSendReqeustOk(boolean sendReqeustOk) {
+ this.sendRequestOk = sendReqeustOk;
+ }
+
+ public Message getRequestMsg() {
+ return requestMsg;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..416ba44da3284eaa711a10a735b8b6edeb1e51f8
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.utils;
+
+import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+
+public class MessageUtil {
+ public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {
+ if (requestMessage != null) {
+ Message replyMessage = new Message();
+ String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);
+ String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+ String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+ String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);
+ replyMessage.setBody(body);
+ if (cluster != null) {
+ String replyTopic = MixAll.getReplyTopic(cluster);
+ replyMessage.setTopic(replyTopic);
+ MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
+ MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
+ MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);
+ MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);
+
+ return replyMessage;
+ } else {
+ throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
+ }
+ }
+ throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");
+ }
+
+ public static String getReplyToClient(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+ }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 84af63235570bcfbe9d9919a7b39cfe0283e91a5..3f00d9e4030473f395c9b2a037e50b7e6fde24f0 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
@@ -164,7 +165,7 @@ public class MQClientAPIImplTest {
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
- ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
+ ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
@@ -289,6 +290,7 @@ public class MQClientAPIImplTest {
assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
}
}
+
@Test
public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
doAnswer(new Answer() {
@@ -322,6 +324,38 @@ public class MQClientAPIImplTest {
assertThat(result).isEqualTo(true);
}
+ @Test
+ public void testSendMessageTypeofReply() throws Exception {
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ InvokeCallback callback = mock.getArgument(3);
+ RemotingCommand request = mock.getArgument(1);
+ ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+ responseFuture.setResponseCommand(createSuccessResponse(request));
+ callback.operationComplete(responseFuture);
+ return null;
+ }
+ }).when(remotingClient).invokeAsync(Matchers.anyString(), Matchers.any(RemotingCommand.class), Matchers.anyLong(), Matchers.any(InvokeCallback.class));
+ SendMessageContext sendMessageContext = new SendMessageContext();
+ sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
+ msg.getProperties().put("MSG_TYPE", "reply");
+ mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
+ new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+ assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ }
+ }, null, null, 0, sendMessageContext, defaultMQProducerImpl);
+ }
+
private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index cb32bda8ac218dd64d87d3be981809623e38c235..cc0b801380bb2116465ee3d010c4563efd9269f3 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -21,15 +21,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -45,6 +48,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
@@ -337,6 +341,101 @@ public class DefaultMQProducerTest {
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
+ @Test
+ public void testRequestMessage() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ final AtomicBoolean finish = new AtomicBoolean(false);
+ new Thread(new Runnable() {
+ @Override public void run() {
+ ConcurrentHashMap responseMap = RequestFutureTable.getRequestFutureTable();
+ assertThat(responseMap).isNotNull();
+ while (!finish.get()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ for (Map.Entry entry : responseMap.entrySet()) {
+ RequestResponseFuture future = entry.getValue();
+ future.putResponseMessage(message);
+ }
+ }
+ }
+ }).start();
+ Message result = producer.request(message, 3 * 1000L);
+ finish.getAndSet(true);
+ assertThat(result.getTopic()).isEqualTo("FooBar");
+ assertThat(result.getBody()).isEqualTo(new byte[] {'a'});
+ }
+
+ @Test(expected = RequestTimeoutException.class)
+ public void testRequestMessage_RequestTimeoutException() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ Message result = producer.request(message, 3 * 1000L);
+ }
+
+ @Test
+ public void testAsyncRequest_OnSuccess() throws Exception {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ RequestCallback requestCallback = new RequestCallback() {
+ @Override public void onSuccess(Message message) {
+ assertThat(message.getTopic()).isEqualTo("FooBar");
+ assertThat(message.getBody()).isEqualTo(new byte[] {'a'});
+ assertThat(message.getFlag()).isEqualTo(1);
+ countDownLatch.countDown();
+ }
+
+ @Override public void onException(Throwable e) {
+ }
+ };
+ producer.request(message, requestCallback, 3 * 1000L);
+ ConcurrentHashMap responseMap = RequestFutureTable.getRequestFutureTable();
+ assertThat(responseMap).isNotNull();
+ for (Map.Entry entry : responseMap.entrySet()) {
+ RequestResponseFuture future = entry.getValue();
+ future.setSendReqeustOk(true);
+ message.setFlag(1);
+ future.getRequestCallback().onSuccess(message);
+ }
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testAsyncRequest_OnException() throws Exception {
+ final AtomicInteger cc = new AtomicInteger(0);
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ RequestCallback requestCallback = new RequestCallback() {
+ @Override public void onSuccess(Message message) {
+
+ }
+
+ @Override public void onException(Throwable e) {
+ cc.incrementAndGet();
+ countDownLatch.countDown();
+ }
+ };
+ MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
+ @Override
+ public MessageQueue select(List mqs, Message msg, Object arg) {
+ return null;
+ }
+ };
+
+ try {
+ producer.request(message, requestCallback, 3 * 1000L);
+ failBecauseExceptionWasNotThrown(Exception.class);
+ } catch (Exception e) {
+ ConcurrentHashMap responseMap = RequestFutureTable.getRequestFutureTable();
+ assertThat(responseMap).isNotNull();
+ for (Map.Entry entry : responseMap.entrySet()) {
+ RequestResponseFuture future = entry.getValue();
+ future.getRequestCallback().onException(e);
+ }
+ }
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ assertThat(cc.get()).isEqualTo(1);
+ }
+
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..90e4623e9b486770b1740d090a2dc9f05aca51f5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RequestResponseFutureTest {
+
+ @Test
+ public void testExecuteRequestCallback() throws Exception {
+ final AtomicInteger cc = new AtomicInteger(0);
+ RequestResponseFuture future = new RequestResponseFuture(UUID.randomUUID().toString(), 3 * 1000L, new RequestCallback() {
+ @Override public void onSuccess(Message message) {
+ cc.incrementAndGet();
+ }
+
+ @Override public void onException(Throwable e) {
+ }
+ });
+ future.setSendReqeustOk(true);
+ future.executeRequestCallback();
+ assertThat(cc.get()).isEqualTo(1);
+ }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..803e596fc829bc1cd666563f70bd35c7e824cc13
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class MessageUtilsTest {
+
+ @Test
+ public void testCreateReplyMessage() throws MQClientException {
+ Message msg = MessageUtil.createReplyMessage(createReplyMessage("clusterName"), new byte[] {'a'});
+ assertThat(msg.getTopic()).isEqualTo("clusterName" + "_" + MixAll.REPLY_TOPIC_POSTFIX);
+ assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT)).isEqualTo("127.0.0.1");
+ assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_TTL)).isEqualTo("3000");
+ }
+
+ @Test
+ public void testCreateReplyMessage_Exception() throws MQClientException {
+ try {
+ Message msg = MessageUtil.createReplyMessage(createReplyMessage(null), new byte[] {'a'});
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
+ }
+ }
+
+ @Test
+ public void testCreateReplyMessage_reqMsgIsNull() throws MQClientException {
+ try {
+ Message msg = MessageUtil.createReplyMessage(null, new byte[] {'a'});
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("create reply message fail, requestMessage cannot be null.");
+ }
+ }
+
+ @Test
+ public void testGetReplyToClient() throws MQClientException {
+ Message msg = createReplyMessage("clusterName");
+ String replyToClient = MessageUtil.getReplyToClient(msg);
+ assertThat(replyToClient).isNotNull();
+ assertThat(replyToClient).isEqualTo("127.0.0.1");
+ }
+
+ private Message createReplyMessage(String clusterName) {
+ Message requestMessage = new Message();
+ Map map = new HashMap();
+ map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1");
+ map.put(MessageConst.PROPERTY_CLUSTER, clusterName);
+ map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000");
+ MessageAccessor.setProperties(requestMessage, map);
+ return requestMessage;
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 1c3f37d00d61d2fe04630b6a427906544dd4d95d..a7568f0a2079a9f19ee666b362fd21f5a6db3c3f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -61,6 +61,7 @@ public class BrokerConfig {
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+ private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16;
@@ -83,6 +84,7 @@ public class BrokerConfig {
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
+ private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
@@ -180,6 +182,8 @@ public class BrokerConfig {
@ImportantField
private boolean aclEnable = false;
+ private boolean storeReplyMessageEnable = true;
+
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
@@ -374,6 +378,14 @@ public class BrokerConfig {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}
+ public int getProcessReplyMessageThreadPoolNums() {
+ return processReplyMessageThreadPoolNums;
+ }
+
+ public void setProcessReplyMessageThreadPoolNums(int processReplyMessageThreadPoolNums) {
+ this.processReplyMessageThreadPoolNums = processReplyMessageThreadPoolNums;
+ }
+
public int getQueryMessageThreadPoolNums() {
return queryMessageThreadPoolNums;
}
@@ -470,6 +482,14 @@ public class BrokerConfig {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
+ public int getReplyThreadPoolQueueCapacity() {
+ return replyThreadPoolQueueCapacity;
+ }
+
+ public void setReplyThreadPoolQueueCapacity(int replyThreadPoolQueueCapacity) {
+ this.replyThreadPoolQueueCapacity = replyThreadPoolQueueCapacity;
+ }
+
public int getQueryThreadPoolQueueCapacity() {
return queryThreadPoolQueueCapacity;
}
@@ -749,7 +769,7 @@ public class BrokerConfig {
public void setMsgTraceTopicName(String msgTraceTopicName) {
this.msgTraceTopicName = msgTraceTopicName;
}
-
+
public boolean isTraceTopicEnable() {
return traceTopicEnable;
}
@@ -765,4 +785,12 @@ public class BrokerConfig {
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
+
+ public boolean isStoreReplyMessageEnable() {
+ return storeReplyMessageEnable;
+ }
+
+ public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
+ this.storeReplyMessageEnable = storeReplyMessageEnable;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index bba8b36ff5e172098573024e2acdb019c4e03f12..e9a67bb5c00c5491c60210eb918fc43ee7d06634 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -45,8 +45,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class MixAll {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
-
public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
@@ -74,27 +72,26 @@ public class MixAll {
public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL";
public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_";
-
public static final List LOCAL_INET_ADDRESS = getLocalInetAddress();
public static final String LOCALHOST = localhost();
public static final String DEFAULT_CHARSET = "UTF-8";
public static final long MASTER_ID = 0L;
public static final long CURRENT_JVM_PID = getPID();
-
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
-
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
+ public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
-
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
+ public static final String REPLY_MESSAGE_FLAG = "reply";
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
@@ -110,6 +107,10 @@ public class MixAll {
return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
}
+ public static String getReplyTopic(final String clusterName) {
+ return clusterName + "_" + REPLY_TOPIC_POSTFIX;
+ }
+
public static boolean isSysConsumerGroup(final String consumerGroup) {
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index aa8481643a88252731b195c68f3c6c7337dd1f5a..5bdc846562dc6d294f3a4e5cecc4d4bfd5a83099 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -45,6 +45,13 @@ public class MessageConst {
public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
+ public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID";
+ public static final String PROPERTY_MESSAGE_REPLY_TO_CLIENT = "REPLY_TO_CLIENT";
+ public static final String PROPERTY_MESSAGE_TTL = "TTL";
+ public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME";
+ public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
+ public static final String PROPERTY_CLUSTER = "CLUSTER";
+ public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
public static final String KEY_SEPARATOR = " ";
@@ -74,5 +81,12 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
+ STRING_HASH_SET.add(PROPERTY_CORRELATION_ID);
+ STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+ STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL);
+ STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
+ STRING_HASH_SET.add(PROPERTY_PUSH_REPLY_TIME);
+ STRING_HASH_SET.add(PROPERTY_CLUSTER);
+ STRING_HASH_SET.add(PROPERTY_MESSAGE_TYPE);
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index dbdabbcde96a7a6b6d0dbe8781ba2fc66c77a923..b3009d738ff80a50de5878ac43815b6943313163 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -182,4 +182,10 @@ public class RequestCode {
* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
*/
public static final int RESUME_CHECK_HALF_MESSAGE = 323;
+
+ public static final int SEND_REPLY_MESSAGE = 324;
+
+ public static final int SEND_REPLY_MESSAGE_V2 = 325;
+
+ public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
new file mode 100644
index 0000000000000000000000000000000000000000..3bb09073f722e85c8be9d5af131e42a4f3592074
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ReplyMessageRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String producerGroup;
+ @CFNotNull
+ private String topic;
+ @CFNotNull
+ private String defaultTopic;
+ @CFNotNull
+ private Integer defaultTopicQueueNums;
+ @CFNotNull
+ private Integer queueId;
+ @CFNotNull
+ private Integer sysFlag;
+ @CFNotNull
+ private Long bornTimestamp;
+ @CFNotNull
+ private Integer flag;
+ @CFNullable
+ private String properties;
+ @CFNullable
+ private Integer reconsumeTimes;
+ @CFNullable
+ private boolean unitMode = false;
+
+ @CFNotNull
+ private String bornHost;
+ @CFNotNull
+ private String storeHost;
+ @CFNotNull
+ private long storeTimestamp;
+
+ public void checkFields() throws RemotingCommandException {
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getDefaultTopic() {
+ return defaultTopic;
+ }
+
+ public void setDefaultTopic(String defaultTopic) {
+ this.defaultTopic = defaultTopic;
+ }
+
+ public Integer getDefaultTopicQueueNums() {
+ return defaultTopicQueueNums;
+ }
+
+ public void setDefaultTopicQueueNums(Integer defaultTopicQueueNums) {
+ this.defaultTopicQueueNums = defaultTopicQueueNums;
+ }
+
+ public Integer getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(Integer queueId) {
+ this.queueId = queueId;
+ }
+
+ public Integer getSysFlag() {
+ return sysFlag;
+ }
+
+ public void setSysFlag(Integer sysFlag) {
+ this.sysFlag = sysFlag;
+ }
+
+ public Long getBornTimestamp() {
+ return bornTimestamp;
+ }
+
+ public void setBornTimestamp(Long bornTimestamp) {
+ this.bornTimestamp = bornTimestamp;
+ }
+
+ public Integer getFlag() {
+ return flag;
+ }
+
+ public void setFlag(Integer flag) {
+ this.flag = flag;
+ }
+
+ public String getProperties() {
+ return properties;
+ }
+
+ public void setProperties(String properties) {
+ this.properties = properties;
+ }
+
+ public Integer getReconsumeTimes() {
+ return reconsumeTimes;
+ }
+
+ public void setReconsumeTimes(Integer reconsumeTimes) {
+ this.reconsumeTimes = reconsumeTimes;
+ }
+
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+ public void setUnitMode(boolean unitMode) {
+ this.unitMode = unitMode;
+ }
+
+ public String getBornHost() {
+ return bornHost;
+ }
+
+ public void setBornHost(String bornHost) {
+ this.bornHost = bornHost;
+ }
+
+ public String getStoreHost() {
+ return storeHost;
+ }
+
+ public void setStoreHost(String storeHost) {
+ this.storeHost = storeHost;
+ }
+
+ public long getStoreTimestamp() {
+ return storeTimestamp;
+ }
+
+ public void setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..86d1fd4d42dbf21db700df4ce84667c2d4980287
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.util.UUID;
+
+public class CorrelationIdUtil {
+ public static String createCorrelationId() {
+ return UUID.randomUUID().toString();
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..072291d5c2ec8be0e7bd176f9d8fa3dd8d36b2ae
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.RequestCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class AsyncRequestProducer {
+ private static final InternalLogger log = ClientLogger.getLog();
+
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ String producerGroup = "please_rename_unique_group_name";
+ String topic = "RequestTopic";
+ long ttl = 3000;
+
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+ producer.start();
+
+ try {
+ Message msg = new Message(topic,
+ "",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ long begin = System.currentTimeMillis();
+ producer.request(msg, new RequestCallback() {
+ @Override
+ public void onSuccess(Message message) {
+ long cost = System.currentTimeMillis() - begin;
+ System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ System.err.printf("request to <%s> fail.", topic);
+ }
+ }, ttl);
+ } catch (Exception e) {
+ log.warn("", e);
+ }
+ /* shutdown after your request callback is finished */
+// producer.shutdown();
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..b34908b84f34bfb10c7f16145d9a1fefd6295de6
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class RequestProducer {
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ String producerGroup = "please_rename_unique_group_name";
+ String topic = "RequestTopic";
+ long ttl = 3000;
+
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+ producer.start();
+
+ try {
+ Message msg = new Message(topic,
+ "",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ long begin = System.currentTimeMillis();
+ Message retMsg = producer.request(msg, ttl);
+ long cost = System.currentTimeMillis() - begin;
+ System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ producer.shutdown();
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..c62c7d4eb6da90529b7bb7b208b71bb83cd2a8ca
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ResponseConsumer {
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ String producerGroup = "please_rename_unique_group_name";
+ String consumerGroup = "please_rename_unique_group_name";
+ String topic = "RequestTopic";
+
+ // create a producer to send reply message
+ DefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);
+ replyProducer.start();
+
+ // create consumer
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+
+ // recommend client configs
+ consumer.setPullTimeDelayMillsWhenException(0L);
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ for (MessageExt msg : msgs) {
+ try {
+ System.out.printf("handle message: %s", msg.toString());
+ String replyTo = MessageUtil.getReplyToClient(msg);
+ byte[] replyContent = "reply message contents.".getBytes();
+ // create reply message with given util, do not create reply message by yourself
+ Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);
+
+ // send reply message with producer
+ SendResult replyResult = replyProducer.send(replyMessage, 3000);
+ System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
+ } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ consumer.subscribe(topic, "*");
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+}