From c6c699ebf616c3e0199a9b71a7a441018b0e4757 Mon Sep 17 00:00:00 2001 From: qqeasonchen Date: Tue, 27 Aug 2019 17:33:36 +0800 Subject: [PATCH] [RIP-16]impl rpc support --- .../rocketmq/broker/BrokerController.java | 28 ++ .../broker/client/ProducerManager.java | 10 + .../processor/ReplyMessageProcessor.java | 292 ++++++++++++++++++ .../processor/SendMessageProcessor.java | 6 +- .../broker/topic/TopicConfigManager.java | 8 + .../client/common/ClientErrorCode.java | 1 + .../exception/RequestTimeoutException.java | 40 +++ .../client/impl/ClientRemotingProcessor.java | 88 +++++- .../rocketmq/client/impl/MQClientAPIImpl.java | 22 +- .../impl/producer/DefaultMQProducerImpl.java | 258 ++++++++++++++++ .../client/producer/DefaultMQProducer.java | 106 +++++++ .../rocketmq/client/producer/MQProducer.java | 22 ++ .../client/producer/RequestCallback.java | 9 + .../client/producer/RequestFutureTable.java | 46 +++ .../producer/RequestResponseFuture.java | 117 +++++++ .../rocketmq/client/utils/MessageUtil.java | 30 ++ .../apache/rocketmq/common/BrokerConfig.java | 28 ++ .../org/apache/rocketmq/common/MixAll.java | 7 + .../rocketmq/common/message/MessageConst.java | 14 + .../rocketmq/common/protocol/RequestCode.java | 6 + .../header/ReplyMessageRequestHeader.java | 153 +++++++++ .../rocketmq/common/utils/RequestIdUtil.java | 9 + 22 files changed, 1290 insertions(+), 10 deletions(-) create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java create mode 100644 client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java create mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java create mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java create mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java create mode 100644 client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index a885cd08..84b3e024 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -62,6 +62,7 @@ import org.apache.rocketmq.broker.processor.EndTransactionProcessor; import org.apache.rocketmq.broker.processor.PullMessageProcessor; import org.apache.rocketmq.broker.processor.QueryMessageProcessor; import org.apache.rocketmq.broker.processor.SendMessageProcessor; +import org.apache.rocketmq.broker.processor.ReplyMessageProcessor; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; @@ -132,6 +133,7 @@ public class BrokerController { private final SlaveSynchronize slaveSynchronize; private final BlockingQueue sendThreadPoolQueue; private final BlockingQueue pullThreadPoolQueue; + private final BlockingQueue replyThreadPoolQueue; private final BlockingQueue queryThreadPoolQueue; private final BlockingQueue clientManagerThreadPoolQueue; private final BlockingQueue heartbeatThreadPoolQueue; @@ -147,6 +149,7 @@ public class BrokerController { private TopicConfigManager topicConfigManager; private ExecutorService sendMessageExecutor; private ExecutorService pullMessageExecutor; + private ExecutorService replyMessageExecutor; private ExecutorService queryMessageExecutor; private ExecutorService adminBrokerExecutor; private ExecutorService clientManageExecutor; @@ -194,6 +197,7 @@ public class BrokerController { this.sendThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity()); this.pullThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity()); + this.replyThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getReplyThreadPoolQueueCapacity()); this.queryThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); @@ -277,6 +281,14 @@ public class BrokerController { this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); + this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.replyThreadPoolQueue, + new ThreadFactoryImpl("ProcessReplyMessageThread_")); + this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), @@ -553,6 +565,18 @@ public class BrokerController { this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); + /** + * ReplyMessageProcessor + */ + ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this); + replyMessageProcessor.registerSendMessageHook(sendMessageHookList); + + this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); + + /** * QueryMessageProcessor */ @@ -763,6 +787,10 @@ public class BrokerController { this.pullMessageExecutor.shutdown(); } + if (this.replyMessageExecutor != null) { + this.replyMessageExecutor.shutdown(); + } + if (this.adminBrokerExecutor != null) { this.adminBrokerExecutor.shutdown(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 61ceae53..db372fa8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +44,7 @@ public class ProducerManager { private final Lock groupChannelLock = new ReentrantLock(); private final HashMap> groupChannelTable = new HashMap>(); + private final ConcurrentHashMap clientChannelTable = new ConcurrentHashMap<>(); private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter(); public ProducerManager() { } @@ -82,6 +84,7 @@ public class ProducerManager { long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp(); if (diff > CHANNEL_EXPIRED_TIMEOUT) { it.remove(); + clientChannelTable.remove(info.getClientId()); log.warn( "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); @@ -113,6 +116,7 @@ public class ProducerManager { final ClientChannelInfo clientChannelInfo = clientChannelInfoTable.remove(channel); if (clientChannelInfo != null) { + clientChannelTable.remove(clientChannelInfo.getClientId()); log.info( "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", clientChannelInfo.toString(), remoteAddr, group); @@ -146,6 +150,7 @@ public class ProducerManager { clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); if (null == clientChannelInfoFound) { channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); + clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel()); log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString()); } @@ -171,6 +176,7 @@ public class ProducerManager { HashMap channelTable = this.groupChannelTable.get(group); if (null != channelTable && !channelTable.isEmpty()) { ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); + clientChannelTable.remove(clientChannelInfo.getClientId()); if (old != null) { log.info("unregister a producer[{}] from groupChannelTable {}", group, clientChannelInfo.toString()); @@ -223,4 +229,8 @@ public class ProducerManager { } return null; } + + public Channel findChannel(String clientId) { + return clientChannelTable.get(clientId); + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java new file mode 100644 index 00000000..00960d79 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -0,0 +1,292 @@ +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.mqtrace.SendMessageContext; +import org.apache.rocketmq.common.*; +import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.*; +import org.apache.rocketmq.store.stats.BrokerStatsManager; + +public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { + + public ReplyMessageProcessor(final BrokerController brokerController) { + super(brokerController); + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + SendMessageContext mqtraceContext = null; + SendMessageRequestHeader requestHeader = parseRequestHeader(request); + if (requestHeader == null) { + return null; + } + + mqtraceContext = buildMsgContext(ctx, requestHeader); + this.executeSendMessageHookBefore(ctx, request, mqtraceContext); + + RemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader); + + this.executeSendMessageHookAfter(response, mqtraceContext); + return response; + } + + @Override + protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException { + SendMessageRequestHeaderV2 requestHeaderV2 = null; + SendMessageRequestHeader requestHeader = null; + switch (request.getCode()) { + case RequestCode.SEND_REPLY_MESSAGE_V2: + requestHeaderV2 = + (SendMessageRequestHeaderV2) request + .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + case RequestCode.SEND_REPLY_MESSAGE: + if (null == requestHeaderV2) { + requestHeader = + (SendMessageRequestHeader) request + .decodeCommandCustomHeader(SendMessageRequestHeader.class); + } else { + requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); + } + default: + break; + } + return requestHeader; + } + + private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx, + final RemotingCommand request, + final SendMessageContext sendMessageContext, + final SendMessageRequestHeader requestHeader) { + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + log.debug("receive SendReplyMessage request command, {}", request); + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + final byte[] body = request.getBody(); + + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(requestHeader.getTopic()); + msgInner.setQueueId(queueIdInt); + msgInner.setBody(body); + msgInner.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); + msgInner.setPropertiesString(requestHeader.getProperties()); + msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); + msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + + boolean pushOk = this.pushReplyMessage(ctx, requestHeader, msgInner, response); + + if (pushOk && this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) { + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + this.handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); + } else { + responseHeader.setMsgId("0"); + responseHeader.setQueueId(0); + responseHeader.setQueueOffset(0L); + } + + return response; + } + + private boolean pushReplyMessage(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final Message msg, final RemotingCommand response) { + ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader(); + replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString()); + replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString()); + replyMessageRequestHeader.setStoreTimestamp(System.currentTimeMillis()); + replyMessageRequestHeader.setProducerGroup(requestHeader.getProducerGroup()); + replyMessageRequestHeader.setTopic(requestHeader.getTopic()); + replyMessageRequestHeader.setDefaultTopic(requestHeader.getDefaultTopic()); + replyMessageRequestHeader.setDefaultTopicQueueNums(requestHeader.getDefaultTopicQueueNums()); + replyMessageRequestHeader.setQueueId(requestHeader.getQueueId()); + replyMessageRequestHeader.setSysFlag(requestHeader.getSysFlag()); + replyMessageRequestHeader.setBornTimestamp(requestHeader.getBornTimestamp()); + replyMessageRequestHeader.setFlag(requestHeader.getFlag()); + replyMessageRequestHeader.setProperties(requestHeader.getProperties()); + replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes()); + replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode()); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader); + request.setBody(msg.getBody()); + + String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO); + boolean pushOk = false; + + if (senderId != null) { + Channel channel = this.brokerController.getProducerManager().findChannel(senderId); + if (channel != null) { + msg.getProperties().put(MessageConst.PROPERTY_PUSH_REPLY_TIME, String.valueOf(System.currentTimeMillis())); + replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); + + try { + RemotingCommand pushResponse = this.brokerController.getBroker2Client().callClient(channel, request); + assert pushResponse != null; + switch (pushResponse.getCode()) { + case ResponseCode.SUCCESS: { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + pushOk = true; + break; + } + default: { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("push reply message to requester fail"); + log.warn("push reply message to <{}> return fail, remark: {}", senderId, response.getRemark()); + } + } + } catch (InterruptedException e) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("push reply message to requester fail"); + log.warn("push reply message to <{}> fail. {}", senderId, channel, e); + } catch (RemotingException e) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("push reply message to requester fail"); + log.warn("push reply message to <{}> fail. {}", senderId, channel, e); + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("channel of <" + senderId + "> not found"); + log.warn("push reply message fial, channel of <{}> not found.", senderId); + } + return pushOk; + } + log.warn("REPLY_TO is null, can not reply message"); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("REPLY_TO is null"); + return pushOk; + } + + private void handlePutMessageResult(PutMessageResult putMessageResult, final RemotingCommand response, + final RemotingCommand request, final MessageExt msg, + final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, + int queueIdInt) { + if (putMessageResult == null) { + response.setRemark("push reply to requester success, but store putMessage return null"); + return ; + } + boolean sendOK = false; + + switch (putMessageResult.getPutMessageStatus()) { + // Success + case PUT_OK: + sendOK = true; + response.setCode(ResponseCode.SUCCESS); + break; + case FLUSH_DISK_TIMEOUT: + response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); + sendOK = true; + break; + case FLUSH_SLAVE_TIMEOUT: + response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); + sendOK = true; + break; + case SLAVE_NOT_AVAILABLE: + response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); + sendOK = true; + break; + + // Failed + case CREATE_MAPEDFILE_FAILED: +// response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("create mapped file failed, server is busy or broken."); + break; + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: +// response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark( + "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + break; + case SERVICE_NOT_AVAILABLE: +// response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark( + "service not available now, maybe disk full, maybe your broker machine memory too small."); + break; + case OS_PAGECACHE_BUSY: +// response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); + break; + case UNKNOWN_ERROR: +// response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR"); + break; + default: +// response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR DEFAULT"); + break; + } + + String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); + if (sendOK) { + this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); + this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); + response.setRemark(null); + responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); + responseHeader.setQueueId(queueIdInt); + responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); + + if (hasSendMessageHook()) { + sendMessageContext.setMsgId(responseHeader.getMsgId()); + sendMessageContext.setQueueId(responseHeader.getQueueId()); + sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); + + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); + int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); + int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); + } + } else { + if (hasSendMessageHook()) { + int wroteSize = request.getBody().length; + int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); + } + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 8035ae6f..2589a754 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -343,11 +343,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); - msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); + MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); PutMessageResult putMessageResult = null; Map oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); @@ -536,6 +538,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement messageExtBatch.setBornHost(ctx.channel().remoteAddress()); messageExtBatch.setStoreHost(this.getStoreHost()); messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); + MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName); PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 8f215cdc..577fff57 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -134,6 +134,14 @@ public class TopicConfigManager extends ConfigManager { this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } } + { + String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(1); + topicConfig.setWriteQueueNums(1); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } } public boolean isSystemTopic(final String topic) { diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java index 62a95dfa..633670ae 100644 --- a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java +++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java @@ -23,4 +23,5 @@ public class ClientErrorCode { public static final int BROKER_NOT_EXIST_EXCEPTION = 10003; public static final int NO_NAME_SERVER_EXCEPTION = 10004; public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005; + public static final int REQUEST_TIMEOUT_EXCEPTION = 10006; } \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java new file mode 100644 index 00000000..fd338d60 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java @@ -0,0 +1,40 @@ +package org.apache.rocketmq.client.exception; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.help.FAQUrl; + +public class RequestTimeoutException extends Exception { + private static final long serialVersionUID = -5758410930844185841L; + private int responseCode; + private String errorMessage; + + public RequestTimeoutException(String errorMessage, Throwable cause) { + super(errorMessage, cause); + this.responseCode = -1; + this.errorMessage = errorMessage; + } + + public RequestTimeoutException(int responseCode, String errorMessage) { + super("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " + + errorMessage); + this.responseCode = responseCode; + this.errorMessage = errorMessage; + } + + public int getResponseCode() { + return responseCode; + } + + public RequestTimeoutException setResponseCode(final int responseCode) { + this.responseCode = responseCode; + return this; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(final String errorMessage) { + this.errorMessage = errorMessage; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 0bd810a1..7a17a043 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.client.impl; +import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -25,11 +27,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.MQProducerInner; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.RequestFutureTable; +import org.apache.rocketmq.client.producer.RequestResponseFuture; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.*; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -43,6 +44,8 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestH import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -76,6 +79,9 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { case RequestCode.CONSUME_MESSAGE_DIRECTLY: return this.consumeMessageDirectly(ctx, request); + + case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT: + return this.receiveReplyMssage(ctx, request); default: break; } @@ -213,4 +219,78 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { return response; } + + private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + long receiveTime = System.currentTimeMillis(); + ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class); + + try { + MessageExt msg = new MessageExt(); + msg.setTopic(requestHeader.getTopic()); + msg.setQueueId(requestHeader.getQueueId()); + msg.setStoreTimestamp(requestHeader.getStoreTimestamp()); + + if (requestHeader.getBornHost() != null) { + String[] bornHostArr = requestHeader.getBornHost().split("/"); + String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 1]; + String[] host = bornHost.split(":"); + if (host.length == 2) + msg.setBornHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); + } + + if (requestHeader.getStoreHost() != null) { + String[] storeHostArr = requestHeader.getStoreHost().split("/"); + String storeHost = storeHostArr[storeHostArr.length - 1]; + String[] host = storeHost.split(":"); + if (host.length == 2) + msg.setStoreHost(new InetSocketAddress(host[0], Integer.parseInt(host[1]))); + } + + byte[] body = request.getBody(); + if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { + try { + body = UtilAll.uncompress(body); + } catch (IOException e) { + log.warn("err when uncompress constant", e); + } + } + msg.setBody(body); + msg.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties())); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime)); + msg.setBornTimestamp(requestHeader.getBornTimestamp()); + msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + log.debug("receive reply message :{}", msg); + + processReplyMessage(msg); + } catch (Exception e) { + log.warn("unknown err when receiveRRReplyMsg", e); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("process reply message fail"); + } + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private void processReplyMessage(MessageExt replyMsg) { + final String uniqueId = replyMsg.getUserProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(uniqueId); + if (requestResponseFuture != null) { + requestResponseFuture.putResponseMessage(replyMsg); + + RequestFutureTable.getRequestFutureTable().remove(uniqueId); + + if (requestResponseFuture.getRequestCallback() != null) { + requestResponseFuture.getRequestCallback().onSuccess(replyMsg); + } else { + requestResponseFuture.putResponseMessage(replyMsg); + } + } else { + log.warn(String.format("receive reply message, but not matched any request, REQUEST_UNIQ_ID: %s", uniqueId)); + } + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b743af93..f780adf9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -199,6 +199,8 @@ public class MQClientAPIImpl { this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null); + + this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null); } public List getNameServerAddressList() { @@ -419,13 +421,23 @@ public class MQClientAPIImpl { ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); RemotingCommand request = null; - if (sendSmartMsg || msg instanceof MessageBatch) { - SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); + boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG); + if (isReply) { + if (sendSmartMsg) { + SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); + request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2); + } else { + request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); + } } else { - request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); + if (sendSmartMsg || msg instanceof MessageBatch) { + SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); + request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + } else { + request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); + } } - request.setBody(msg.getBody()); switch (communicationMode) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 62aaef3b..7a0bc4cd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -24,6 +24,8 @@ import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -39,6 +41,7 @@ import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.client.hook.CheckForbiddenContext; import org.apache.rocketmq.client.hook.CheckForbiddenHook; import org.apache.rocketmq.client.hook.SendMessageContext; @@ -52,6 +55,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.RequestCallback; +import org.apache.rocketmq.client.producer.RequestFutureTable; +import org.apache.rocketmq.client.producer.RequestResponseFuture; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; @@ -79,11 +85,13 @@ import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHe import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.utils.RequestIdUtil; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; @@ -107,6 +115,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { private final BlockingQueue asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private ExecutorService asyncSenderExecutor; + private final Timer timer = new Timer("RequestHouseKeepingService", true); public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { this(defaultMQProducer, null); @@ -212,6 +221,17 @@ public class DefaultMQProducerImpl implements MQProducerInner { } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + + this.timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + RequestFutureTable.scanExpiredRequest(); + } catch (Throwable e) { + log.error("scan RequestFutureTable exception", e); + } + } + }, 1000 * 3, 1000); } private void checkConfig() throws MQClientException { @@ -1310,6 +1330,244 @@ public class DefaultMQProducerImpl implements MQProducerInner { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } + public Message request(Message msg, long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException { + prepareSendRequest(msg, timeout); + final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + + try { + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + + this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } + + @Override + public void onException(Throwable e) { + requestResponseFuture.setSendReqeustOk(false); + requestResponseFuture.putResponseMessage(null); + requestResponseFuture.setCause(e); + } + }, timeout); + + Message responseMessage = requestResponseFuture.waitResponseMessage(timeout); + if (responseMessage == null) { + if (requestResponseFuture.isSendReqeustOk()) { + throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + } else { + throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); + } + } + return responseMessage; + } finally { + RequestFutureTable.getRequestFutureTable().remove(requestUniqId); + } + } + + public void request(Message msg, final RequestCallback requestCallback, long timeout) throws RemotingException { + prepareSendRequest(msg, timeout); + final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + + try { + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + + this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } + + @Override + public void onException(Throwable e) { + requestResponseFuture.setCause(e); + requestFail(requestUniqId); + } + }, timeout); + } catch (Exception ex) { + log.warn("send request message to <{}> failed.", msg.getTopic(), ex); + throw new RemotingSendRequestException(msg.getTopic(), ex); + } + } + + public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException, RequestTimeoutException { + prepareSendRequest(msg, timeout); + final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + + try { + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + + this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } + + @Override + public void onException(Throwable e) { + requestResponseFuture.setSendReqeustOk(false); + requestResponseFuture.putResponseMessage(null); + requestResponseFuture.setCause(e); + } + }, timeout); + + Message responseMessage = requestResponseFuture.waitResponseMessage(timeout); + if (responseMessage == null) { + if (requestResponseFuture.isSendReqeustOk()) { + throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + } else { + throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); + } + } + return responseMessage; + } finally { + RequestFutureTable.getRequestFutureTable().remove(requestUniqId); + } + } + + public void request(final Message msg, final MessageQueueSelector selector, final Object arg, + final RequestCallback requestCallback, final long timeout) throws RemotingException { + prepareSendRequest(msg, timeout); + final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + + try { + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + + this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } + + @Override + public void onException(Throwable e) { + requestResponseFuture.setCause(e); + requestFail(requestUniqId); + } + }, timeout); + } catch (Exception ex) { + log.warn("send request message to <{}> failed.", msg.getTopic(), ex); + throw new RemotingSendRequestException(msg.getTopic(), ex); + } + } + + public Message request(final Message msg, final MessageQueue mq, final long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { + prepareSendRequest(msg, timeout); + final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + + try { + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + + this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } + + @Override + public void onException(Throwable e) { + requestResponseFuture.setSendReqeustOk(false); + requestResponseFuture.putResponseMessage(null); + requestResponseFuture.setCause(e); + } + }, null, timeout); + + Message responseMessage = requestResponseFuture.waitResponseMessage(timeout); + if (responseMessage == null) { + if (requestResponseFuture.isSendReqeustOk()) { + throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + } else { + throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); + } + } + return responseMessage; + } finally { + RequestFutureTable.getRequestFutureTable().remove(requestUniqId); + } + } + + public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) + throws RemotingException { + prepareSendRequest(msg, timeout); + final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + + try { + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + + this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + requestResponseFuture.setSendReqeustOk(true); + } + + @Override + public void onException(Throwable e) { + requestResponseFuture.setCause(e); + requestFail(requestUniqId); + } + }, null, timeout); + } catch (Exception ex) { + log.warn("send request message to <{}> failed.", msg.getTopic(), ex); + throw new RemotingSendRequestException(msg.getTopic(), ex); + } + } + + private void requestFail(final String requestUniqId) { + RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(requestUniqId); + if (responseFuture != null) { + responseFuture.setSendReqeustOk(false); + responseFuture.putResponseMessage(null); + try { + responseFuture.executeRequestCallback(); + } catch (Exception e) { + log.warn("execute requestCallback in requestFail, and callback throw", e); + } + } + } + + private void prepareSendRequest(final Message msg, long timeout){ + String requestUniqId = RequestIdUtil.createUniqueRequestId(); + String requestClientId = this.getmQClientFactory().getClientId(); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO, requestClientId); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout)); + + boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic()); + if (!hasRouteData) { + long beginTimestamp = System.currentTimeMillis(); + this.tryToFindTopicPublishInfo(msg.getTopic()); + this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); + long cost = System.currentTimeMillis() - beginTimestamp; + if (cost > 500) { + log.warn("prepare send request for <{}> cost {} ms", msg.getTopic(), cost); + } + } + } + + private SendResult reply(final Message msg, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeoutMillis); + } + + private SendResult reply(final Message msg, final SendCallback sendCallback, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + return this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeoutMillis); + } + + public void replyOneway(final Message msg) throws RemotingException, MQClientException, InterruptedException { + this.sendOneway(msg); + } + public ConcurrentMap getTopicPublishInfoTable() { return topicPublishInfoTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index b4acf8f1..4789c4d3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; @@ -582,6 +583,111 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } + /** + * + * @param msg + * @param timeout + * @return + * @throws MQClientException + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + */ + @Override + public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, + RemotingException, MQBrokerException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); + return this.defaultMQProducerImpl.request(msg, timeout); + } + + /** + * + * @param msg + * @param requestCallback + * @param timeout + * @return + * @throws MQClientException + * @throws RemotingException + * @throws InterruptedException + */ + @Override + public void request(final Message msg, final RequestCallback requestCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException, MQBrokerException, RequestTimeoutException { + this.defaultMQProducerImpl.request(msg, requestCallback, timeout); + } + + /** + * + * @param msg + * @param selector + * @param arg + * @param timeout + * @return + * @throws MQClientException + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + */ + @Override + public Message request(final Message msg, final MessageQueueSelector selector, final Object arg, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException, RequestTimeoutException { + return this.defaultMQProducerImpl.request(msg, selector, arg, timeout); + } + + /** + * + * @param msg + * @param selector + * @param arg + * @param requestCallback + * @param timeout + * @return + * @throws MQClientException + * @throws RemotingException + * @throws InterruptedException + */ + @Override + public void request(final Message msg, final MessageQueueSelector selector, final Object arg, + final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException, + InterruptedException { + this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout); + } + + /** + * + * @param msg + * @param mq + * @param timeout + * @return + * @throws MQClientException + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + */ + @Override + public Message request(final Message msg, final MessageQueue mq, final long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { + return this.defaultMQProducerImpl.request(msg, mq, timeout); + } + + /** + * + * @param msg + * @param mq + * @param requestCallback + * @param timeout + * @return + * @throws MQClientException + * @throws RemotingException + * @throws InterruptedException + */ + @Override + public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout); + } + /** * Same to {@link #sendOneway(Message)} with message queue selector specified. * diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 1af60057..ce8d4a50 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -98,4 +99,25 @@ public interface MQProducer extends MQAdmin { SendResult send(final Collection msgs, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + + //for rpc + Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + void request(final Message msg, final RequestCallback requestCallback, final long timeout) + throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException, MQBrokerException; + + Message request(final Message msg, final MessageQueueSelector selector, final Object arg, + final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + void request(final Message msg, final MessageQueueSelector selector, final Object arg, + final RequestCallback requestCallback, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, + InterruptedException; + + Message request(final Message msg, final MessageQueue mq, final long timeout) + throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; + + void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) + throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java new file mode 100644 index 00000000..4a1874f9 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java @@ -0,0 +1,9 @@ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.common.message.Message; + +public interface RequestCallback { + void onSuccess(final Message message); + + void onException(final Throwable e); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java new file mode 100644 index 00000000..74195be3 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java @@ -0,0 +1,46 @@ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.client.common.ClientErrorCode; +import org.apache.rocketmq.client.exception.RequestTimeoutException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.logging.InternalLogger; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RequestFutureTable { + private static InternalLogger log = ClientLogger.getLog(); + private static ConcurrentHashMap requestFutureTable = new ConcurrentHashMap(); + + public static ConcurrentHashMap getRequestFutureTable(){ + return requestFutureTable; + } + + public static void scanExpiredRequest(){ + final List rfList = new LinkedList(); + Iterator> it = requestFutureTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + RequestResponseFuture rep = next.getValue(); + + if (rep.isTimeout()) { + it.remove(); + rfList.add(rep); + log.warn("remove timeout request, REQUEST_UNIQ_ID={}" + rep.getRequestUniqId()); + } + } + + for (RequestResponseFuture rf : rfList) { + try { + Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message."); + rf.setCause(cause); + rf.executeRequestCallback(); + } catch (Throwable e) { + log.warn("scanResponseTable, operationComplete Exception", e); + } + } + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java new file mode 100644 index 00000000..a5c5feeb --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java @@ -0,0 +1,117 @@ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.common.message.Message; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class RequestResponseFuture { + private final String requestUniqId; + private long timeoutMillis; + private final RequestCallback requestCallback; + private final long beginTimestamp = System.currentTimeMillis(); + private CountDownLatch countDownLatch = new CountDownLatch(1); + + private AtomicBoolean ececuteCallbackOnlyOnce = new AtomicBoolean(false); + private volatile Message responseMsg = null; + private volatile boolean sendReqeustOk = true; + private volatile Throwable cause = null; + private final Message requestMsg = null; + + + public RequestResponseFuture(String requestUniqId, long timeoutMillis, RequestCallback requestCallback){ + this.requestUniqId = requestUniqId; + this.timeoutMillis = timeoutMillis; + this.requestCallback = requestCallback; + } + + public void executeRequestCallback(){ + if (requestCallback != null) { + if (sendReqeustOk && cause == null) { + requestCallback.onSuccess(responseMsg); + } else { + requestCallback.onException(cause); + } + } + } + + public boolean isTimeout(){ + long diff = System.currentTimeMillis() - this.beginTimestamp; + return diff > this.timeoutMillis; + } + + public Message waitResponseMessage(final long timeout) throws InterruptedException { + this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); + return this.responseMsg; + } + + public void putResponseMessage(final Message responseMsg){ + this.responseMsg = responseMsg; + this.countDownLatch.countDown(); + } + + public String getRequestUniqId() { + return requestUniqId; + } + + public long getTimeoutMillis() { + return timeoutMillis; + } + + public void setTimeoutMillis(long timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + + public RequestCallback getRequestCallback() { + return requestCallback; + } + + public long getBeginTimestamp() { + return beginTimestamp; + } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + public AtomicBoolean getEcecuteCallbackOnlyOnce() { + return ececuteCallbackOnlyOnce; + } + + public void setEcecuteCallbackOnlyOnce(AtomicBoolean ececuteCallbackOnlyOnce) { + this.ececuteCallbackOnlyOnce = ececuteCallbackOnlyOnce; + } + + public Message getResponseMsg() { + return responseMsg; + } + + public void setResponseMsg(Message responseMsg) { + this.responseMsg = responseMsg; + } + + public boolean isSendReqeustOk() { + return sendReqeustOk; + } + + public void setSendReqeustOk(boolean sendReqeustOk) { + this.sendReqeustOk = sendReqeustOk; + } + + public Message getRequestMsg() { + return requestMsg; + } + + public Throwable getCause() { + return cause; + } + + public void setCause(Throwable cause) { + this.cause = cause; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java new file mode 100644 index 00000000..ce417752 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java @@ -0,0 +1,30 @@ +package org.apache.rocketmq.client.utils; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; + +public class MessageUtil { + public static Message createReplyMessage(final Message requestMessage) { + if (requestMessage != null) { + Message replyMessage = new Message(); + String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER); + String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); + String requestUniqId = requestMessage.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL); + if (cluster == null) { + + } + String replyTopic = MixAll.getReplyTopic(cluster); + replyMessage.setTopic(replyTopic); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO, replyTo); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl); + + return replyMessage; + } + return null; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 1c3f37d0..c3613e7d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -61,6 +61,7 @@ public class BrokerConfig { */ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors(); private int adminBrokerThreadPoolNums = 16; @@ -83,6 +84,7 @@ public class BrokerConfig { private boolean fetchNamesrvAddrByAddressServer = false; private int sendThreadPoolQueueCapacity = 10000; private int pullThreadPoolQueueCapacity = 100000; + private int replyThreadPoolQueueCapacity = 10000; private int queryThreadPoolQueueCapacity = 20000; private int clientManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000; @@ -180,6 +182,8 @@ public class BrokerConfig { @ImportantField private boolean aclEnable = false; + private boolean storeReplyMessageEnable = true; + public static String localHostName() { try { return InetAddress.getLocalHost().getHostName(); @@ -374,6 +378,14 @@ public class BrokerConfig { this.pullMessageThreadPoolNums = pullMessageThreadPoolNums; } + public int getProcessReplyMessageThreadPoolNums() { + return processReplyMessageThreadPoolNums; + } + + public void setProcessReplyMessageThreadPoolNums(int processReplyMessageThreadPoolNums) { + this.processReplyMessageThreadPoolNums = processReplyMessageThreadPoolNums; + } + public int getQueryMessageThreadPoolNums() { return queryMessageThreadPoolNums; } @@ -470,6 +482,14 @@ public class BrokerConfig { this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity; } + public int getReplyThreadPoolQueueCapacity() { + return replyThreadPoolQueueCapacity; + } + + public void setReplyThreadPoolQueueCapacity(int replyThreadPoolQueueCapacity) { + this.replyThreadPoolQueueCapacity = replyThreadPoolQueueCapacity; + } + public int getQueryThreadPoolQueueCapacity() { return queryThreadPoolQueueCapacity; } @@ -765,4 +785,12 @@ public class BrokerConfig { public void setAclEnable(boolean aclEnable) { this.aclEnable = aclEnable; } + + public boolean isStoreReplyMessageEnable() { + return storeReplyMessageEnable; + } + + public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) { + this.storeReplyMessageEnable = storeReplyMessageEnable; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 0af65dff..a6151e55 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -84,6 +84,7 @@ public class MixAll { public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; + public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC"; public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY"; public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion"; @@ -96,6 +97,8 @@ public class MixAll { public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; + public static final String REPLY_MESSAGE_FLAG = "reply"; + public static String getWSAddr() { String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); @@ -110,6 +113,10 @@ public class MixAll { return RETRY_GROUP_TOPIC_PREFIX + consumerGroup; } + public static String getReplyTopic(final String clusterName) { + return clusterName + "_" + REPLY_TOPIC_POSTFIX; + } + public static boolean isSysConsumerGroup(final String consumerGroup) { return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index aa848164..178e417c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -45,6 +45,13 @@ public class MessageConst { public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES"; public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS"; public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID"; + public static final String PROPERTY_REQUEST_UNIQ_ID = "REQUEST_UNIQ_ID"; + public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO"; + public static final String PROPERTY_MESSAGE_TTL = "TTL"; + public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME"; + public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME"; + public static final String PROPERTY_CLUSTER = "CLUSTER"; + public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE"; public static final String KEY_SEPARATOR = " "; @@ -74,5 +81,12 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES); STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP); STRING_HASH_SET.add(PROPERTY_INSTANCE_ID); + STRING_HASH_SET.add(PROPERTY_REQUEST_UNIQ_ID); + STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO); + STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL); + STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME); + STRING_HASH_SET.add(PROPERTY_PUSH_REPLY_TIME); + STRING_HASH_SET.add(PROPERTY_CLUSTER); + STRING_HASH_SET.add(PROPERTY_MESSAGE_TYPE); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 58c4b9fe..ef7f2a1f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -180,4 +180,10 @@ public class RequestCode { * resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before */ public static final int RESUME_CHECK_HALF_MESSAGE = 323; + + public static final int SEND_REPLY_MESSAGE = 324; + + public static final int SEND_REPLY_MESSAGE_V2 = 325; + + public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java new file mode 100644 index 00000000..465baeae --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java @@ -0,0 +1,153 @@ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.annotation.CFNullable; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class ReplyMessageRequestHeader implements CommandCustomHeader { + @CFNotNull + private String producerGroup; + @CFNotNull + private String topic; + @CFNotNull + private String defaultTopic; + @CFNotNull + private Integer defaultTopicQueueNums; + @CFNotNull + private Integer queueId; + @CFNotNull + private Integer sysFlag; + @CFNotNull + private Long bornTimestamp; + @CFNotNull + private Integer flag; + @CFNullable + private String properties; + @CFNullable + private Integer reconsumeTimes; + @CFNullable + private boolean unitMode = false; + + @CFNotNull + private String bornHost; + @CFNotNull + private String storeHost; + @CFNotNull + private long storeTimestamp; + + public void checkFields() throws RemotingCommandException { + } + + public String getProducerGroup() { + return producerGroup; + } + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getDefaultTopic() { + return defaultTopic; + } + + public void setDefaultTopic(String defaultTopic) { + this.defaultTopic = defaultTopic; + } + + public Integer getDefaultTopicQueueNums() { + return defaultTopicQueueNums; + } + + public void setDefaultTopicQueueNums(Integer defaultTopicQueueNums) { + this.defaultTopicQueueNums = defaultTopicQueueNums; + } + + public Integer getQueueId() { + return queueId; + } + + public void setQueueId(Integer queueId) { + this.queueId = queueId; + } + + public Integer getSysFlag() { + return sysFlag; + } + + public void setSysFlag(Integer sysFlag) { + this.sysFlag = sysFlag; + } + + public Long getBornTimestamp() { + return bornTimestamp; + } + + public void setBornTimestamp(Long bornTimestamp) { + this.bornTimestamp = bornTimestamp; + } + + public Integer getFlag() { + return flag; + } + + public void setFlag(Integer flag) { + this.flag = flag; + } + + public String getProperties() { + return properties; + } + + public void setProperties(String properties) { + this.properties = properties; + } + + public Integer getReconsumeTimes() { + return reconsumeTimes; + } + + public void setReconsumeTimes(Integer reconsumeTimes) { + this.reconsumeTimes = reconsumeTimes; + } + + public boolean isUnitMode() { + return unitMode; + } + + public void setUnitMode(boolean unitMode) { + this.unitMode = unitMode; + } + + public String getBornHost() { + return bornHost; + } + + public void setBornHost(String bornHost) { + this.bornHost = bornHost; + } + + public String getStoreHost() { + return storeHost; + } + + public void setStoreHost(String storeHost) { + this.storeHost = storeHost; + } + + public long getStoreTimestamp() { + return storeTimestamp; + } + + public void setStoreTimestamp(long storeTimestamp) { + this.storeTimestamp = storeTimestamp; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java new file mode 100644 index 00000000..9a58e5fd --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java @@ -0,0 +1,9 @@ +package org.apache.rocketmq.common.utils; + +import java.util.UUID; + +public class RequestIdUtil { + public static String createUniqueRequestId(){ + return UUID.randomUUID().toString(); + } +} -- GitLab