未验证 提交 e735fffe 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1422 from qqeasonchen/rocketmq-dev-rpc

[RIP-16]Support request/response pattern
......@@ -61,6 +61,7 @@ import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
......@@ -132,6 +133,7 @@ public class BrokerController {
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> replyThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
......@@ -147,6 +149,7 @@ public class BrokerController {
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService replyMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
......@@ -194,6 +197,7 @@ public class BrokerController {
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
......@@ -277,6 +281,14 @@ public class BrokerController {
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
......@@ -553,6 +565,17 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* ReplyMessageProcessor
*/
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
/**
* QueryMessageProcessor
*/
......@@ -763,6 +786,10 @@ public class BrokerController {
this.pullMessageExecutor.shutdown();
}
if (this.replyMessageExecutor != null) {
this.replyMessageExecutor.shutdown();
}
if (this.adminBrokerExecutor != null) {
this.adminBrokerExecutor.shutdown();
}
......
......@@ -17,17 +17,16 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -43,7 +42,9 @@ public class ProducerManager {
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public ProducerManager() {
}
......@@ -90,6 +91,7 @@ public class ProducerManager {
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
clientChannelTable.remove(info.getClientId());
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
......@@ -121,6 +123,7 @@ public class ProducerManager {
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
clientChannelTable.remove(clientChannelInfo.getClientId());
log.info(
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
......@@ -154,6 +157,7 @@ public class ProducerManager {
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
......@@ -179,6 +183,7 @@ public class ProducerManager {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
clientChannelTable.remove(clientChannelInfo.getClientId());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
......@@ -231,4 +236,8 @@ public class ProducerManager {
}
return null;
}
public Channel findChannel(String clientId) {
return clientChannelTable.get(clientId);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public ReplyMessageProcessor(final BrokerController brokerController) {
super(brokerController);
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext = null;
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
@Override
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException {
SendMessageRequestHeaderV2 requestHeaderV2 = null;
SendMessageRequestHeader requestHeader = null;
switch (request.getCode()) {
case RequestCode.SEND_REPLY_MESSAGE_V2:
requestHeaderV2 =
(SendMessageRequestHeaderV2) request
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
case RequestCode.SEND_REPLY_MESSAGE:
if (null == requestHeaderV2) {
requestHeader =
(SendMessageRequestHeader) request
.decodeCommandCustomHeader(SendMessageRequestHeader.class);
} else {
requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
}
default:
break;
}
return requestHeader;
}
private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendReplyMessage request command, {}", request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);
this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);
if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
}
return response;
}
private PushReplyResult pushReplyMessage(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader,
final Message msg) {
ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader();
replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString());
replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString());
replyMessageRequestHeader.setStoreTimestamp(System.currentTimeMillis());
replyMessageRequestHeader.setProducerGroup(requestHeader.getProducerGroup());
replyMessageRequestHeader.setTopic(requestHeader.getTopic());
replyMessageRequestHeader.setDefaultTopic(requestHeader.getDefaultTopic());
replyMessageRequestHeader.setDefaultTopicQueueNums(requestHeader.getDefaultTopicQueueNums());
replyMessageRequestHeader.setQueueId(requestHeader.getQueueId());
replyMessageRequestHeader.setSysFlag(requestHeader.getSysFlag());
replyMessageRequestHeader.setBornTimestamp(requestHeader.getBornTimestamp());
replyMessageRequestHeader.setFlag(requestHeader.getFlag());
replyMessageRequestHeader.setProperties(requestHeader.getProperties());
replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes());
replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader);
request.setBody(msg.getBody());
String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
PushReplyResult pushReplyResult = new PushReplyResult(false);
if (senderId != null) {
Channel channel = this.brokerController.getProducerManager().findChannel(senderId);
if (channel != null) {
msg.getProperties().put(MessageConst.PROPERTY_PUSH_REPLY_TIME, String.valueOf(System.currentTimeMillis()));
replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
try {
RemotingCommand pushResponse = this.brokerController.getBroker2Client().callClient(channel, request);
assert pushResponse != null;
switch (pushResponse.getCode()) {
case ResponseCode.SUCCESS: {
pushReplyResult.setPushOk(true);
break;
}
default: {
pushReplyResult.setPushOk(false);
pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
log.warn("push reply message to <{}> return fail, response remark: {}", senderId, pushResponse.getRemark());
}
}
} catch (RemotingException | InterruptedException e) {
pushReplyResult.setPushOk(false);
pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
log.warn("push reply message to <{}> fail. {}", senderId, channel, e);
}
} else {
pushReplyResult.setPushOk(false);
pushReplyResult.setRemark("push reply message fail, channel of <" + senderId + "> not found.");
log.warn(pushReplyResult.getRemark());
}
} else {
log.warn(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + " is null, can not reply message");
pushReplyResult.setPushOk(false);
pushReplyResult.setRemark("reply message properties[" + MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + "] is null");
}
return pushReplyResult;
}
private void handlePushReplyResult(PushReplyResult pushReplyResult, final RemotingCommand response,
final SendMessageResponseHeader responseHeader, int queueIdInt) {
if (!pushReplyResult.isPushOk()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(pushReplyResult.getRemark());
} else {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
//set to zore to avoid client decoding exception
responseHeader.setMsgId("0");
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(0L);
}
}
private void handlePutMessageResult(PutMessageResult putMessageResult,
final RemotingCommand request, final MessageExt msg,
final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext,
int queueIdInt) {
if (putMessageResult == null) {
log.warn("process reply message, store putMessage return null");
return;
}
boolean putOk = false;
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
putOk = true;
break;
// Failed
case CREATE_MAPEDFILE_FAILED:
log.info("create mapped file failed, server is busy or broken.");
break;
case MESSAGE_ILLEGAL:
log.info(
"the message is illegal, maybe msg properties length limit 32k.");
break;
case PROPERTIES_SIZE_EXCEEDED:
log.info(
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k.");
break;
case SERVICE_NOT_AVAILABLE:
log.info(
"service not available now, maybe disk full, maybe your broker machine memory too small.");
break;
case OS_PAGECACHE_BUSY:
log.info("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break;
case UNKNOWN_ERROR:
log.info("UNKNOWN_ERROR");
break;
default:
log.info("UNKNOWN_ERROR DEFAULT");
break;
}
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (putOk) {
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
} else {
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
}
}
class PushReplyResult {
boolean pushOk;
String remark;
public PushReplyResult(boolean pushOk) {
this.pushOk = pushOk;
remark = "";
}
public boolean isPushOk() {
return pushOk;
}
public void setPushOk(boolean pushOk) {
this.pushOk = pushOk;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
}
}
......@@ -343,11 +343,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
......@@ -536,6 +538,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
......
......@@ -34,11 +34,11 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class TopicConfigManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......@@ -134,6 +134,14 @@ public class TopicConfigManager extends ConfigManager {
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
{
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
public boolean isSystemTopic(final String topic) {
......
......@@ -20,6 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field;
import java.util.HashMap;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -42,14 +43,14 @@ public class ProducerManagerTest {
@Before
public void init() {
producerManager = new ProducerManager();
clientInfo = new ClientChannelInfo(channel);
clientInfo = new ClientChannelInfo(channel, "clientId", LanguageCode.JAVA, 0);
}
@Test
public void scanNotActiveChannel() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
assertThat(producerManager.findChannel("clientId")).isNotNull();
Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
field.setAccessible(true);
long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
......@@ -57,22 +58,28 @@ public class ProducerManagerTest {
when(channel.close()).thenReturn(mock(ChannelFuture.class));
producerManager.scanNotActiveChannel();
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
assertThat(producerManager.findChannel("clientId")).isNull();
}
@Test
public void doChannelCloseEvent() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
assertThat(producerManager.findChannel("clientId")).isNotNull();
producerManager.doChannelCloseEvent("127.0.0.1", channel);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
assertThat(producerManager.findChannel("clientId")).isNull();
}
@Test
public void testRegisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
Channel channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNotNull();
assertThat(channel1).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
assertThat(channel1).isEqualTo(channel);
}
@Test
......@@ -81,10 +88,15 @@ public class ProducerManagerTest {
HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
Channel channel1 = producerManager.findChannel("clientId");
assertThat(channel1).isNotNull();
assertThat(channel1).isEqualTo(channel);
producerManager.unregisterProducer(group, clientInfo);
channelMap = producerManager.getGroupChannelTable().get(group);
channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNull();
assertThat(channel1).isNull();
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ReplyMessageProcessorTest {
private ReplyMessageProcessor replyMessageProcessor;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
@Mock
private MessageStore messageStore;
@Mock
private Channel channel;
private String topic = "FooBar";
private String group = "FooBarGroup";
private ClientChannelInfo clientInfo;
@Mock
private Broker2Client broker2Client;
@Before
public void init() throws IllegalAccessException, NoSuchFieldException {
clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0);
brokerController.setMessageStore(messageStore);
Field field = BrokerController.class.getDeclaredField("broker2Client");
field.setAccessible(true);
field.set(brokerController, broker2Client);
when(messageStore.now()).thenReturn(System.currentTimeMillis());
Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
replyMessageProcessor = new ReplyMessageProcessor(brokerController);
}
@Test
public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
brokerController.getProducerManager().registerProducer(group, clientInfo);
final RemotingCommand request = createSendMessageRequestHeaderCommand(RequestCode.SEND_REPLY_MESSAGE);
when(brokerController.getBroker2Client().callClient(any(Channel.class), any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS, request));
RemotingCommand responseToReturn = replyMessageProcessor.processRequest(handlerContext, request);
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
}
private RemotingCommand createSendMessageRequestHeaderCommand(int requestCode) {
SendMessageRequestHeader requestHeader = createSendMessageRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.setBody(new byte[] {'a'});
request.makeCustomHeaderToNet();
return request;
}
private SendMessageRequestHeader createSendMessageRequestHeader() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(group);
requestHeader.setTopic(topic);
requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
requestHeader.setDefaultTopicQueueNums(3);
requestHeader.setQueueId(1);
requestHeader.setSysFlag(0);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(124);
requestHeader.setReconsumeTimes(0);
Map<String, String> map = new HashMap<String, String>();
map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));
return requestHeader;
}
private RemotingCommand createResponse(int code, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setCode(code);
response.setOpaque(request.getOpaque());
return response;
}
}
\ No newline at end of file
......@@ -53,6 +53,7 @@ public class ClientConfig {
* Offset persistent interval for consumer
*/
private int persistConsumerOffsetInterval = 1000 * 5;
private long pullTimeDelayMillsWhenException = 1000;
private boolean unitMode = false;
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
......@@ -148,6 +149,7 @@ public class ClientConfig {
this.pollNameServerInterval = cc.pollNameServerInterval;
this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
this.pullTimeDelayMillsWhenException = cc.pullTimeDelayMillsWhenException;
this.unitMode = cc.unitMode;
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
......@@ -165,6 +167,7 @@ public class ClientConfig {
cc.pollNameServerInterval = pollNameServerInterval;
cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
cc.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
cc.unitMode = unitMode;
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
......@@ -222,6 +225,14 @@ public class ClientConfig {
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
}
public long getPullTimeDelayMillsWhenException() {
return pullTimeDelayMillsWhenException;
}
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
public String getUnitName() {
return unitName;
}
......@@ -287,12 +298,13 @@ public class ClientConfig {
this.accessChannel = accessChannel;
}
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+ ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
}
}
......@@ -23,4 +23,6 @@ public class ClientErrorCode {
public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
public static final int NO_NAME_SERVER_EXCEPTION = 10004;
public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
public static final int REQUEST_TIMEOUT_EXCEPTION = 10006;
public static final int CREATE_REPLY_MESSAGE_EXCEPTION = 10007;
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.exception;
import org.apache.rocketmq.common.UtilAll;
public class RequestTimeoutException extends Exception {
private static final long serialVersionUID = -5758410930844185841L;
private int responseCode;
private String errorMessage;
public RequestTimeoutException(String errorMessage, Throwable cause) {
super(errorMessage, cause);
this.responseCode = -1;
this.errorMessage = errorMessage;
}
public RequestTimeoutException(int responseCode, String errorMessage) {
super("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ errorMessage);
this.responseCode = responseCode;
this.errorMessage = errorMessage;
}
public int getResponseCode() {
return responseCode;
}
public RequestTimeoutException setResponseCode(final int responseCode) {
this.responseCode = responseCode;
return this;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(final String errorMessage) {
this.errorMessage = errorMessage;
}
}
......@@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.client.impl;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.RequestFutureTable;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
......@@ -42,14 +45,16 @@ import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRe
import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ClientRemotingProcessor implements NettyRequestProcessor {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
......@@ -76,6 +81,9 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
return this.receiveReplyMessage(ctx, request);
default:
break;
}
......@@ -213,4 +221,73 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand receiveReplyMessage(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
long receiveTime = System.currentTimeMillis();
ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
try {
MessageExt msg = new MessageExt();
msg.setTopic(requestHeader.getTopic());
msg.setQueueId(requestHeader.getQueueId());
msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
if (requestHeader.getBornHost() != null) {
msg.setBornHost(RemotingUtil.string2SocketAddress(requestHeader.getBornHost()));
}
if (requestHeader.getStoreHost() != null) {
msg.setStoreHost(RemotingUtil.string2SocketAddress(requestHeader.getStoreHost()));
}
byte[] body = request.getBody();
if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
try {
body = UtilAll.uncompress(body);
} catch (IOException e) {
log.warn("err when uncompress constant", e);
}
}
msg.setBody(body);
msg.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime));
msg.setBornTimestamp(requestHeader.getBornTimestamp());
msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
log.debug("receive reply message :{}", msg);
processReplyMessage(msg);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} catch (Exception e) {
log.warn("unknown err when receiveReplyMsg", e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("process reply message fail");
}
return response;
}
private void processReplyMessage(MessageExt replyMsg) {
final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);
if (requestResponseFuture != null) {
requestResponseFuture.putResponseMessage(replyMsg);
RequestFutureTable.getRequestFutureTable().remove(correlationId);
if (requestResponseFuture.getRequestCallback() != null) {
requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
} else {
requestResponseFuture.putResponseMessage(replyMsg);
}
} else {
String bornHost = replyMsg.getBornHostString();
log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",
correlationId, bornHost));
}
}
}
......@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
......@@ -201,6 +200,8 @@ public class MQClientAPIImpl {
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
public List<String> getNameServerAddressList() {
......@@ -304,8 +305,8 @@ public class MQClientAPIImpl {
requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),","));
requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),","));
requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ","));
requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ","));
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
......@@ -344,7 +345,7 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis)
public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
......@@ -366,7 +367,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,
final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null);
......@@ -445,13 +447,23 @@ public class MQClientAPIImpl {
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
switch (communicationMode) {
......
......@@ -106,7 +106,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
/**
* Delay some time when exception occur
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000;
private long pullTimeDelayMillsWhenException = 1000;
/**
* Flow control interval
*/
......@@ -156,6 +156,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return new Thread(r, "MonitorMessageQueueChangeThread");
}
});
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
private void checkServiceState() {
......@@ -783,7 +784,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Throwable e) {
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
pullDelayTimeMills = pullTimeDelayMillsWhenException;
log.error("An error occurred in pull message process.", e);
}
......@@ -1070,4 +1071,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
}
......@@ -83,7 +83,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
* Delay some time when exception occur
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
private long pullTimeDelayMillsWhenException = 3000;
/**
* Flow control interval
*/
......@@ -115,6 +115,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
......@@ -222,7 +223,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
......@@ -282,7 +283,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
......@@ -290,7 +291,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
......@@ -397,7 +398,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
......@@ -444,7 +445,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
......@@ -1168,4 +1169,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.consumeMessageService = consumeMessageService;
}
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
}
......@@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -39,6 +41,7 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
......@@ -52,6 +55,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.client.producer.RequestFutureTable;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
......@@ -79,6 +85,7 @@ import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHe
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.utils.CorrelationIdUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
......@@ -95,17 +102,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private final Timer timer = new Timer("RequestHouseKeepingService", true);
protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor;
private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor;
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
......@@ -212,6 +218,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
private void checkConfig() throws MQClientException {
......@@ -1325,6 +1342,233 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
public Message request(Message msg,
long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}
public void request(Message msg, final RequestCallback requestCallback, long timeout)
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(correlationId);
}
}, timeout - cost);
}
public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException, RequestTimeoutException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}
public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout)
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(correlationId);
}
}, timeout - cost);
}
public Message request(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, null, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(correlationId);
}
}, null, timeout - cost);
}
private void requestFail(final String correlationId) {
RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId);
if (responseFuture != null) {
responseFuture.setSendReqeustOk(false);
responseFuture.putResponseMessage(null);
try {
responseFuture.executeRequestCallback();
} catch (Exception e) {
log.warn("execute requestCallback in requestFail, and callback throw", e);
}
}
}
private void prepareSendRequest(final Message msg, long timeout) {
String correlationId = CorrelationIdUtil.createCorrelationId();
String requestClientId = this.getmQClientFactory().getClientId();
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, requestClientId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
if (!hasRouteData) {
long beginTimestamp = System.currentTimeMillis();
this.tryToFindTopicPublishInfo(msg.getTopic());
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
long cost = System.currentTimeMillis() - beginTimestamp;
if (cost > 500) {
log.warn("prepare send request for <{}> cost {} ms", msg.getTopic(), cost);
}
}
}
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}
......
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
......@@ -42,38 +43,29 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* This class is the entry point for applications intending to send messages.
* </p>
* This class is the entry point for applications intending to send messages. </p>
*
* It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
* box for most scenarios.
* </p>
* box for most scenarios. </p>
*
* This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and
* cons; you'd better understand strengths and weakness of them before actually coding.
* </p>
* cons; you'd better understand strengths and weakness of them before actually coding. </p>
*
* <p>
* <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
* and used among multiple threads context.
* </p>
* <p> <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
* and used among multiple threads context. </p>
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
private final InternalLogger log = ClientLogger.getLog();
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved.
* </p>
* important when transactional messages are involved. </p>
*
* For non-transactional messages, it does not matter as long as it's unique per process.
* </p>
* For non-transactional messages, it does not matter as long as it's unique per process. </p>
*
* See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
*/
......@@ -100,16 +92,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* </p>
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
* </p>
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
......@@ -268,14 +258,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Start this producer instance.
* </p>
* Start this producer instance. </p>
*
* <strong>
* Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
* this method before sending or querying messages.
* </strong>
* </p>
* <strong> Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
* to invoke this method before sending or querying messages. </strong> </p>
*
* @throws MQClientException if there is any unexpected error.
*/
......@@ -316,8 +302,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Send message in synchronous mode. This method returns only when the sending procedure totally completes.
* </p>
* Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>
*
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
......@@ -359,11 +344,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Send message to broker asynchronously.
* </p>
* Send message to broker asynchronously. </p>
*
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
* </p>
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p>
*
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
......@@ -582,6 +565,133 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
/**
* Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p>
*
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
*
* @param msg request message to send
* @param timeout request timeout
* @return reply message
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.request(msg, timeout);
}
/**
* Request asynchronously. </p>
* This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p>
*
* Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
* application developers are the one to resolve this potential issue.
*
* @param msg request message to send
* @param requestCallback callback to execute on request completion.
* @param timeout request timeout
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, requestCallback, timeout);
}
/**
* Same to {@link #request(Message, long)} with message queue selector specified.
*
* @param msg request message to send
* @param selector message queue selector, through which we get target message queue to deliver message to.
* @param arg argument to work along with message queue selector.
* @param timeout timeout of request.
* @return reply message
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException, RequestTimeoutException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.request(msg, selector, arg, timeout);
}
/**
* Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
*
* @param msg requst message to send
* @param selector message queue selector, through which we get target message queue to deliver message to.
* @param arg argument to work along with message queue selector.
* @param requestCallback callback to execute on request completion.
* @param timeout timeout of request.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout);
}
/**
* Same to {@link #request(Message, long)} with target message queue specified in addition.
*
* @param msg request message to send
* @param mq target message queue.
* @param timeout request timeout
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
public Message request(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.request(msg, mq, timeout);
}
/**
* Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
*
* @param msg request message to send
* @param mq target message queue.
* @param requestCallback callback to execute on request completion.
* @param timeout timeout of request.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout);
}
/**
* Same to {@link #sendOneway(Message)} with message queue selector specified.
*
......
......@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -98,4 +99,26 @@ public interface MQProducer extends MQAdmin {
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
//for rpc
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback,
final long timeout) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueue mq, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
public interface RequestCallback {
void onSuccess(final Message message);
void onException(final Throwable e);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
public class RequestFutureTable {
private static InternalLogger log = ClientLogger.getLog();
private static ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
public static ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
return requestFutureTable;
}
public static void scanExpiredRequest() {
final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, RequestResponseFuture> next = it.next();
RequestResponseFuture rep = next.getValue();
if (rep.isTimeout()) {
it.remove();
rfList.add(rep);
log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
}
}
for (RequestResponseFuture rf : rfList) {
try {
Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
rf.setCause(cause);
rf.executeRequestCallback();
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.Message;
public class RequestResponseFuture {
private final String correlationId;
private final RequestCallback requestCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final Message requestMsg = null;
private long timeoutMillis;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile Message responseMsg = null;
private volatile boolean sendRequestOk = true;
private volatile Throwable cause = null;
public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) {
this.correlationId = correlationId;
this.timeoutMillis = timeoutMillis;
this.requestCallback = requestCallback;
}
public void executeRequestCallback() {
if (requestCallback != null) {
if (sendRequestOk && cause == null) {
requestCallback.onSuccess(responseMsg);
} else {
requestCallback.onException(cause);
}
}
}
public boolean isTimeout() {
long diff = System.currentTimeMillis() - this.beginTimestamp;
return diff > this.timeoutMillis;
}
public Message waitResponseMessage(final long timeout) throws InterruptedException {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.responseMsg;
}
public void putResponseMessage(final Message responseMsg) {
this.responseMsg = responseMsg;
this.countDownLatch.countDown();
}
public String getCorrelationId() {
return correlationId;
}
public long getTimeoutMillis() {
return timeoutMillis;
}
public void setTimeoutMillis(long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
public RequestCallback getRequestCallback() {
return requestCallback;
}
public long getBeginTimestamp() {
return beginTimestamp;
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public Message getResponseMsg() {
return responseMsg;
}
public void setResponseMsg(Message responseMsg) {
this.responseMsg = responseMsg;
}
public boolean isSendRequestOk() {
return sendRequestOk;
}
public void setSendReqeustOk(boolean sendReqeustOk) {
this.sendRequestOk = sendReqeustOk;
}
public Message getRequestMsg() {
return requestMsg;
}
public Throwable getCause() {
return cause;
}
public void setCause(Throwable cause) {
this.cause = cause;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.utils;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
public class MessageUtil {
public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {
if (requestMessage != null) {
Message replyMessage = new Message();
String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);
String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);
replyMessage.setBody(body);
if (cluster != null) {
String replyTopic = MixAll.getReplyTopic(cluster);
replyMessage.setTopic(replyTopic);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);
return replyMessage;
} else {
throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
}
}
throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");
}
public static String getReplyToClient(final Message msg) {
return msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
}
}
......@@ -42,6 +42,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
......@@ -164,7 +165,7 @@ public class MQClientAPIImplTest {
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
......@@ -289,6 +290,7 @@ public class MQClientAPIImplTest {
assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
}
}
@Test
public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
doAnswer(new Answer() {
......@@ -322,6 +324,38 @@ public class MQClientAPIImplTest {
assertThat(result).isEqualTo(true);
}
@Test
public void testSendMessageTypeofReply() throws Exception {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
}
}).when(remotingClient).invokeAsync(Matchers.anyString(), Matchers.any(RemotingCommand.class), Matchers.anyLong(), Matchers.any(InvokeCallback.class));
SendMessageContext sendMessageContext = new SendMessageContext();
sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
msg.getProperties().put("MSG_TYPE", "reply");
mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
}
@Override
public void onException(Throwable e) {
}
}, null, null, 0, sendMessageContext, defaultMQProducerImpl);
}
private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
......
......@@ -21,15 +21,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
......@@ -45,6 +48,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
......@@ -337,6 +341,101 @@ public class DefaultMQProducerTest {
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
@Test
public void testRequestMessage() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final AtomicBoolean finish = new AtomicBoolean(false);
new Thread(new Runnable() {
@Override public void run() {
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
assertThat(responseMap).isNotNull();
while (!finish.get()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
RequestResponseFuture future = entry.getValue();
future.putResponseMessage(message);
}
}
}
}).start();
Message result = producer.request(message, 3 * 1000L);
finish.getAndSet(true);
assertThat(result.getTopic()).isEqualTo("FooBar");
assertThat(result.getBody()).isEqualTo(new byte[] {'a'});
}
@Test(expected = RequestTimeoutException.class)
public void testRequestMessage_RequestTimeoutException() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
Message result = producer.request(message, 3 * 1000L);
}
@Test
public void testAsyncRequest_OnSuccess() throws Exception {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
RequestCallback requestCallback = new RequestCallback() {
@Override public void onSuccess(Message message) {
assertThat(message.getTopic()).isEqualTo("FooBar");
assertThat(message.getBody()).isEqualTo(new byte[] {'a'});
assertThat(message.getFlag()).isEqualTo(1);
countDownLatch.countDown();
}
@Override public void onException(Throwable e) {
}
};
producer.request(message, requestCallback, 3 * 1000L);
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
assertThat(responseMap).isNotNull();
for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
RequestResponseFuture future = entry.getValue();
future.setSendReqeustOk(true);
message.setFlag(1);
future.getRequestCallback().onSuccess(message);
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
}
@Test
public void testAsyncRequest_OnException() throws Exception {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);
RequestCallback requestCallback = new RequestCallback() {
@Override public void onSuccess(Message message) {
}
@Override public void onException(Throwable e) {
cc.incrementAndGet();
countDownLatch.countDown();
}
};
MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
};
try {
producer.request(message, requestCallback, 3 * 1000L);
failBecauseExceptionWasNotThrown(Exception.class);
} catch (Exception e) {
ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
assertThat(responseMap).isNotNull();
for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
RequestResponseFuture future = entry.getValue();
future.getRequestCallback().onException(e);
}
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(1);
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class RequestResponseFutureTest {
@Test
public void testExecuteRequestCallback() throws Exception {
final AtomicInteger cc = new AtomicInteger(0);
RequestResponseFuture future = new RequestResponseFuture(UUID.randomUUID().toString(), 3 * 1000L, new RequestCallback() {
@Override public void onSuccess(Message message) {
cc.incrementAndGet();
}
@Override public void onException(Throwable e) {
}
});
future.setSendReqeustOk(true);
future.executeRequestCallback();
assertThat(cc.get()).isEqualTo(1);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.utils;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
public class MessageUtilsTest {
@Test
public void testCreateReplyMessage() throws MQClientException {
Message msg = MessageUtil.createReplyMessage(createReplyMessage("clusterName"), new byte[] {'a'});
assertThat(msg.getTopic()).isEqualTo("clusterName" + "_" + MixAll.REPLY_TOPIC_POSTFIX);
assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT)).isEqualTo("127.0.0.1");
assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_TTL)).isEqualTo("3000");
}
@Test
public void testCreateReplyMessage_Exception() throws MQClientException {
try {
Message msg = MessageUtil.createReplyMessage(createReplyMessage(null), new byte[] {'a'});
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
}
}
@Test
public void testCreateReplyMessage_reqMsgIsNull() throws MQClientException {
try {
Message msg = MessageUtil.createReplyMessage(null, new byte[] {'a'});
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("create reply message fail, requestMessage cannot be null.");
}
}
@Test
public void testGetReplyToClient() throws MQClientException {
Message msg = createReplyMessage("clusterName");
String replyToClient = MessageUtil.getReplyToClient(msg);
assertThat(replyToClient).isNotNull();
assertThat(replyToClient).isEqualTo("127.0.0.1");
}
private Message createReplyMessage(String clusterName) {
Message requestMessage = new Message();
Map map = new HashMap<String, String>();
map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1");
map.put(MessageConst.PROPERTY_CLUSTER, clusterName);
map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000");
MessageAccessor.setProperties(requestMessage, map);
return requestMessage;
}
}
......@@ -61,6 +61,7 @@ public class BrokerConfig {
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16;
......@@ -83,6 +84,7 @@ public class BrokerConfig {
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
......@@ -180,6 +182,8 @@ public class BrokerConfig {
@ImportantField
private boolean aclEnable = false;
private boolean storeReplyMessageEnable = true;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
......@@ -374,6 +378,14 @@ public class BrokerConfig {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}
public int getProcessReplyMessageThreadPoolNums() {
return processReplyMessageThreadPoolNums;
}
public void setProcessReplyMessageThreadPoolNums(int processReplyMessageThreadPoolNums) {
this.processReplyMessageThreadPoolNums = processReplyMessageThreadPoolNums;
}
public int getQueryMessageThreadPoolNums() {
return queryMessageThreadPoolNums;
}
......@@ -470,6 +482,14 @@ public class BrokerConfig {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
public int getReplyThreadPoolQueueCapacity() {
return replyThreadPoolQueueCapacity;
}
public void setReplyThreadPoolQueueCapacity(int replyThreadPoolQueueCapacity) {
this.replyThreadPoolQueueCapacity = replyThreadPoolQueueCapacity;
}
public int getQueryThreadPoolQueueCapacity() {
return queryThreadPoolQueueCapacity;
}
......@@ -765,4 +785,12 @@ public class BrokerConfig {
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
public boolean isStoreReplyMessageEnable() {
return storeReplyMessageEnable;
}
public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
this.storeReplyMessageEnable = storeReplyMessageEnable;
}
}
......@@ -45,8 +45,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class MixAll {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
......@@ -74,27 +72,26 @@ public class MixAll {
public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL";
public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_";
public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress();
public static final String LOCALHOST = localhost();
public static final String DEFAULT_CHARSET = "UTF-8";
public static final long MASTER_ID = 0L;
public static final long CURRENT_JVM_PID = getPID();
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static final String REPLY_MESSAGE_FLAG = "reply";
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
......@@ -110,6 +107,10 @@ public class MixAll {
return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
}
public static String getReplyTopic(final String clusterName) {
return clusterName + "_" + REPLY_TOPIC_POSTFIX;
}
public static boolean isSysConsumerGroup(final String consumerGroup) {
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
}
......
......@@ -45,6 +45,13 @@ public class MessageConst {
public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID";
public static final String PROPERTY_MESSAGE_REPLY_TO_CLIENT = "REPLY_TO_CLIENT";
public static final String PROPERTY_MESSAGE_TTL = "TTL";
public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME";
public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
public static final String PROPERTY_CLUSTER = "CLUSTER";
public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
public static final String KEY_SEPARATOR = " ";
......@@ -74,5 +81,12 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
STRING_HASH_SET.add(PROPERTY_CORRELATION_ID);
STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT);
STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL);
STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
STRING_HASH_SET.add(PROPERTY_PUSH_REPLY_TIME);
STRING_HASH_SET.add(PROPERTY_CLUSTER);
STRING_HASH_SET.add(PROPERTY_MESSAGE_TYPE);
}
}
......@@ -182,4 +182,10 @@ public class RequestCode {
* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
*/
public static final int RESUME_CHECK_HALF_MESSAGE = 323;
public static final int SEND_REPLY_MESSAGE = 324;
public static final int SEND_REPLY_MESSAGE_V2 = 325;
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ReplyMessageRequestHeader implements CommandCustomHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
private String topic;
@CFNotNull
private String defaultTopic;
@CFNotNull
private Integer defaultTopicQueueNums;
@CFNotNull
private Integer queueId;
@CFNotNull
private Integer sysFlag;
@CFNotNull
private Long bornTimestamp;
@CFNotNull
private Integer flag;
@CFNullable
private String properties;
@CFNullable
private Integer reconsumeTimes;
@CFNullable
private boolean unitMode = false;
@CFNotNull
private String bornHost;
@CFNotNull
private String storeHost;
@CFNotNull
private long storeTimestamp;
public void checkFields() throws RemotingCommandException {
}
public String getProducerGroup() {
return producerGroup;
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getDefaultTopic() {
return defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public Integer getDefaultTopicQueueNums() {
return defaultTopicQueueNums;
}
public void setDefaultTopicQueueNums(Integer defaultTopicQueueNums) {
this.defaultTopicQueueNums = defaultTopicQueueNums;
}
public Integer getQueueId() {
return queueId;
}
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
public Integer getSysFlag() {
return sysFlag;
}
public void setSysFlag(Integer sysFlag) {
this.sysFlag = sysFlag;
}
public Long getBornTimestamp() {
return bornTimestamp;
}
public void setBornTimestamp(Long bornTimestamp) {
this.bornTimestamp = bornTimestamp;
}
public Integer getFlag() {
return flag;
}
public void setFlag(Integer flag) {
this.flag = flag;
}
public String getProperties() {
return properties;
}
public void setProperties(String properties) {
this.properties = properties;
}
public Integer getReconsumeTimes() {
return reconsumeTimes;
}
public void setReconsumeTimes(Integer reconsumeTimes) {
this.reconsumeTimes = reconsumeTimes;
}
public boolean isUnitMode() {
return unitMode;
}
public void setUnitMode(boolean unitMode) {
this.unitMode = unitMode;
}
public String getBornHost() {
return bornHost;
}
public void setBornHost(String bornHost) {
this.bornHost = bornHost;
}
public String getStoreHost() {
return storeHost;
}
public void setStoreHost(String storeHost) {
this.storeHost = storeHost;
}
public long getStoreTimestamp() {
return storeTimestamp;
}
public void setStoreTimestamp(long storeTimestamp) {
this.storeTimestamp = storeTimestamp;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.utils;
import java.util.UUID;
public class CorrelationIdUtil {
public static String createCorrelationId() {
return UUID.randomUUID().toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.rpc;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncRequestProducer {
private static final InternalLogger log = ClientLogger.getLog();
public static void main(String[] args) throws MQClientException, InterruptedException {
String producerGroup = "please_rename_unique_group_name";
String topic = "RequestTopic";
long ttl = 3000;
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.start();
try {
Message msg = new Message(topic,
"",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
long begin = System.currentTimeMillis();
producer.request(msg, new RequestCallback() {
@Override
public void onSuccess(Message message) {
long cost = System.currentTimeMillis() - begin;
System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);
}
@Override
public void onException(Throwable e) {
System.err.printf("request to <%s> fail.", topic);
}
}, ttl);
} catch (Exception e) {
log.warn("", e);
}
/* shutdown after your request callback is finished */
// producer.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.rpc;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RequestProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
String producerGroup = "please_rename_unique_group_name";
String topic = "RequestTopic";
long ttl = 3000;
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.start();
try {
Message msg = new Message(topic,
"",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
long begin = System.currentTimeMillis();
Message retMsg = producer.request(msg, ttl);
long cost = System.currentTimeMillis() - begin;
System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.rpc;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class ResponseConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
String producerGroup = "please_rename_unique_group_name";
String consumerGroup = "please_rename_unique_group_name";
String topic = "RequestTopic";
// create a producer to send reply message
DefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);
replyProducer.start();
// create consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// recommend client configs
consumer.setPullTimeDelayMillsWhenException(0L);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {
try {
System.out.printf("handle message: %s", msg.toString());
String replyTo = MessageUtil.getReplyToClient(msg);
byte[] replyContent = "reply message contents.".getBytes();
// create reply message with given util, do not create reply message by yourself
Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);
// send reply message with producer
SendResult replyResult = replyProducer.send(replyMessage, 3000);
System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.subscribe(topic, "*");
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册