diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java index 5a0f5c05e90964183f253abb5f4fe22848cd7aa2..ae5d0770899ec615d342b2230a1d84babcee93e5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java @@ -35,6 +35,7 @@ public class ConsumeMessageContext { private BrokerStatsManager.StatsType commercialRcvStats; private int commercialRcvTimes; private int commercialRcvSize; + private String namespace; public String getConsumerGroup() { return consumerGroup; @@ -147,4 +148,12 @@ public class ConsumeMessageContext { public void setCommercialRcvSize(final int commercialRcvSize) { this.commercialRcvSize = commercialRcvSize; } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java index 0833403011f0b73291764196c27253c72cafaeaf..ab6452e593c13c7b1624fea174b5fa05331500da 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java @@ -40,11 +40,12 @@ public class SendMessageContext { private long bornTimeStamp; private MessageType msgType = MessageType.Trans_msg_Commit; private boolean isSuccess = false; - //For Commercial + private String commercialOwner; private BrokerStatsManager.StatsType commercialSendStats; private int commercialSendSize; private int commercialSendTimes; + private String namespace; public boolean isSuccess() { return isSuccess; @@ -229,4 +230,12 @@ public class SendMessageContext { public void setCommercialSendTimes(final int commercialSendTimes) { this.commercialSendTimes = commercialSendTimes; } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index aa072e8b6a517144ca1d1c92f1c621127a55c973..b0668d49f87ef166da16d0b37090c0f102772e63 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.broker.processor; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Random; + import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; @@ -27,11 +33,10 @@ import org.apache.rocketmq.common.constant.DBMsgConstants; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; @@ -40,18 +45,14 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.utils.ChannelUtil; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; -import java.util.Map; -import java.util.Random; - public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -73,9 +74,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces if (!this.hasSendMessageHook()) { return null; } + String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()); SendMessageContext mqtraceContext; mqtraceContext = new SendMessageContext(); mqtraceContext.setProducerGroup(requestHeader.getProducerGroup()); + mqtraceContext.setNamespace(namespace); mqtraceContext.setTopic(requestHeader.getTopic()); mqtraceContext.setMsgProps(requestHeader.getProperties()); mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); @@ -253,7 +256,9 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces try { final SendMessageRequestHeader requestHeader = parseRequestHeader(request); + String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()); if (null != requestHeader) { + context.setNamespace(namespace); context.setProducerGroup(requestHeader.getProducerGroup()); context.setTopic(requestHeader.getTopic()); context.setBodyLength(request.getBody().length); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index b7e7a6187b73ba12449094a726aaacf396b9db87..8035ae6f185b5c91db73eb0ceea5e0a26f43b3f3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -16,6 +16,10 @@ */ package org.apache.rocketmq.broker.processor; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map; + import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; @@ -33,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; @@ -49,10 +54,6 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import java.net.SocketAddress; -import java.util.List; -import java.util.Map; - public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private List consumeMessageHookList; @@ -101,9 +102,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); + String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup()); if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { ConsumeMessageContext context = new ConsumeMessageContext(); + context.setNamespace(namespace); context.setConsumerGroup(requestHeader.getGroup()); context.setTopic(requestHeader.getOriginTopic()); context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 5d488d7e3f6ee082dc855d4663ee117fd33e8dbb..9a66744c4dedf17732de5e3fb711386e3d03fec6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -16,8 +16,14 @@ */ package org.apache.rocketmq.client; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.LanguageCode; @@ -31,6 +37,7 @@ public class ClientConfig { private String clientIP = RemotingUtil.getLocalAddress(); private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); + protected String namespace; /** * Pulling topic information interval from the named server */ @@ -87,6 +94,38 @@ public class ClientConfig { } } + + public String withNamespace(String resource) { + return NamespaceUtil.wrapNamespace(this.getNamespace(), resource); + } + + public Set withNamespace(Set resourceSet) { + Set resourceWithNamespace = new HashSet(); + for (String resource : resourceSet) { + resourceWithNamespace.add(withNamespace(resource)); + } + return resourceWithNamespace; + } + + public String withoutNamespace(String resource) { + return NamespaceUtil.withoutNamespace(resource, this.getNamespace()); + } + + public Set withoutNamespace(Set resourceSet) { + Set resourceWithoutNamespace = new HashSet(); + for (String resource : resourceSet) { + resourceWithoutNamespace.add(withoutNamespace(resource)); + } + return resourceWithoutNamespace; + } + + public MessageQueue queueWithNamespace(MessageQueue queue) { + if (StringUtils.isEmpty(this.getNamespace())) { + return queue; + } + + return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId()); + } public void resetClientConfig(final ClientConfig cc) { this.namesrvAddr = cc.namesrvAddr; this.clientIP = cc.clientIP; @@ -99,6 +138,7 @@ public class ClientConfig { this.unitName = cc.unitName; this.vipChannelEnabled = cc.vipChannelEnabled; this.useTLS = cc.useTLS; + this.namespace = cc.namespace; this.language = cc.language; } @@ -115,6 +155,7 @@ public class ClientConfig { cc.unitName = unitName; cc.vipChannelEnabled = vipChannelEnabled; cc.useTLS = useTLS; + cc.namespace = namespace; cc.language = language; return cc; } @@ -199,12 +240,20 @@ public class ClientConfig { this.language = language; } + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" - + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]"; + + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]"; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index f4c63df48f65d5a1f5e36fa8da62186a832d808c..f3b6caaa71f23861f8bc767e5f64178cef1ad752 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -84,20 +85,34 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume private int maxReconsumeTimes = 16; public DefaultMQPullConsumer() { - this(MixAll.DEFAULT_CONSUMER_GROUP, null); - } - - public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { - this.consumerGroup = consumerGroup; - defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); + this(null, MixAll.DEFAULT_CONSUMER_GROUP, null); } public DefaultMQPullConsumer(final String consumerGroup) { - this(consumerGroup, null); + this(null, consumerGroup, null); } public DefaultMQPullConsumer(RPCHook rpcHook) { - this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); + this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); + } + + public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { + this(null, consumerGroup, rpcHook); + } + + public DefaultMQPullConsumer(final String namespace, final String consumerGroup) { + this(namespace, consumerGroup, null); + } + /** + * Constructor specifying namespace, consumer group and RPC hook. + * + * @param consumerGroup Consumer group. + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { + this.namespace = namespace; + this.consumerGroup = consumerGroup; + defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); } /** @@ -106,7 +121,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); + createTopic(key, withNamespace(newTopic), queueNum, 0); } /** @@ -115,7 +130,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag); } /** @@ -124,7 +139,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Deprecated @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp); + return this.defaultMQPullConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp); } /** @@ -133,7 +148,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Deprecated @Override public long maxOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQPullConsumerImpl.maxOffset(mq); + return this.defaultMQPullConsumerImpl.maxOffset(queueWithNamespace(mq)); } /** @@ -142,7 +157,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Deprecated @Override public long minOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQPullConsumerImpl.minOffset(mq); + return this.defaultMQPullConsumerImpl.minOffset(queueWithNamespace(mq)); } /** @@ -151,7 +166,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Deprecated @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq); + return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq)); } /** @@ -171,7 +186,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { - return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end); + return this.defaultMQPullConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end); } public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { @@ -239,7 +254,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume } public void setRegisterTopics(Set registerTopics) { - this.registerTopics = registerTopics; + this.registerTopics = withNamespace(registerTopics); } /** @@ -250,6 +265,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Override public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null); } @@ -261,16 +277,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Override public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); } @Override public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { - return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic); + return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic)); } @Override public void start() throws MQClientException { + this.setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPullConsumerImpl.start(); } @@ -282,7 +300,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Override public void registerMessageQueueListener(String topic, MessageQueueListener listener) { synchronized (this.registerTopics) { - this.registerTopics.add(topic); + this.registerTopics.add(withNamespace(topic)); if (listener != null) { this.messageQueueListener = listener; } @@ -292,80 +310,80 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume @Override public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums); + return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums); } @Override public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout); + return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout); } @Override public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums); + return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums); } @Override public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout); + return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, timeout); } @Override public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback); + this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback); } @Override public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout); + this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout); } @Override public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback); + this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, pullCallback); } @Override public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout); + this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, pullCallback, timeout); } @Override public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums); + return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums); } @Override public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback); + this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback); } @Override public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { - this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset); + this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset); } @Override public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException { - return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, fromStore); + return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore); } @Override public Set fetchMessageQueuesInBalance(String topic) throws MQClientException { - return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic); + return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(withNamespace(topic)); } @Override @@ -377,12 +395,13 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume } catch (Exception e) { // Ignore } - return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, uniqKey); + return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(withNamespace(topic), uniqKey); } @Override public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup); } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 3a0f2be9af17ee0788922f5226e2ced3cf810a50..44edfb68b6fe62533e0dbeec999ab54e1a2334b1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -262,7 +263,47 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * Default constructor. */ public DefaultMQPushConsumer() { - this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); + this(null, MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); + } + + /** + * Constructor specifying consumer group. + * + * @param consumerGroup Consumer group. + */ + public DefaultMQPushConsumer(final String consumerGroup) { + this(null, consumerGroup, null, new AllocateMessageQueueAveragely()); + } + + /** + * Constructor specifying namespace and consumer group. + * + * @param namespace Namespace for this MQ Producer instance. + * @param consumerGroup Consumer group. + */ + public DefaultMQPushConsumer(final String namespace, final String consumerGroup) { + this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely()); + } + + + /** + * Constructor specifying RPC hook. + * + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultMQPushConsumer(RPCHook rpcHook) { + this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); + } + + /** + * Constructor specifying namespace, consumer group and RPC hook . + * + * @param namespace Namespace for this MQ Producer instance. + * @param consumerGroup Consumer group. + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { + this(namespace, consumerGroup, rpcHook, new AllocateMessageQueueAveragely()); } /** @@ -274,48 +315,25 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { - this.consumerGroup = consumerGroup; - this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; - defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); + this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy); } /** - * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name. + * Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm. * + * @param namespace Namespace for this MQ Producer instance. * @param consumerGroup Consume queue. * @param rpcHook RPC hook to execute before each remoting command. - * @param allocateMessageQueueStrategy message queue allocating algorithm. - * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. + * @param allocateMessageQueueStrategy Message queue allocating algorithm. */ - public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, - AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { + public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; + this.namespace = namespace; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); - if (enableMsgTrace) { - try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); - dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); - traceDispatcher = dispatcher; - this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( - new ConsumeMessageTraceHookImpl(traceDispatcher)); - } catch (Throwable e) { - log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); - } - } } - /** - * Constructor specifying RPC hook. - * - * @param rpcHook RPC hook to execute before each remoting command. - */ - public DefaultMQPushConsumer(RPCHook rpcHook) { - this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); - } - - /** * Constructor specifying consumer group and enabled msg trace flag. * @@ -323,7 +341,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param enableMsgTrace Switch flag instance for message trace. */ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) { - this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null); + this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null); } /** @@ -334,16 +352,51 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { - this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); + this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); } + /** - * Constructor specifying consumer group. + * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name. * - * @param consumerGroup Consumer group. + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param allocateMessageQueueStrategy message queue allocating algorithm. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. */ - public DefaultMQPushConsumer(final String consumerGroup) { - this(consumerGroup, null, new AllocateMessageQueueAveragely()); + public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { + this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy, enableMsgTrace, customizedTraceTopic); + } + + /** + * Constructor specifying namespace, consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name. + * + * @param namespace Namespace for this MQ Producer instance. + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param allocateMessageQueueStrategy message queue allocating algorithm. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. + */ + public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { + this.consumerGroup = consumerGroup; + this.namespace = namespace; + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); + if (enableMsgTrace) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); + traceDispatcher = dispatcher; + this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( + new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } } /** @@ -352,7 +405,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); + createTopic(key, withNamespace(newTopic), queueNum, 0); } /** @@ -361,7 +414,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag); } /** @@ -370,7 +423,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Deprecated @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp); + return this.defaultMQPushConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp); } /** @@ -379,7 +432,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Deprecated @Override public long maxOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQPushConsumerImpl.maxOffset(mq); + return this.defaultMQPushConsumerImpl.maxOffset(queueWithNamespace(mq)); } /** @@ -388,7 +441,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Deprecated @Override public long minOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQPushConsumerImpl.minOffset(mq); + return this.defaultMQPushConsumerImpl.minOffset(queueWithNamespace(mq)); } /** @@ -397,7 +450,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Deprecated @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq); + return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq)); } /** @@ -417,7 +470,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { - return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end); + return this.defaultMQPushConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end); } /** @@ -433,7 +486,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume } catch (Exception e) { // Ignore } - return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId); + return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(withNamespace(topic), msgId); } public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { @@ -573,7 +626,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ @Deprecated public void setSubscription(Map subscription) { - this.subscription = subscription; + Map subscriptionWithNamespace = new HashMap(); + for (String topic : subscription.keySet()) { + subscriptionWithNamespace.put(withNamespace(topic), subscription.get(topic)); + } + this.subscription = subscriptionWithNamespace; } /** @@ -593,6 +650,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Override public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); } @@ -615,12 +673,13 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume @Override public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); } @Override public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { - return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic); + return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic)); } /** @@ -630,6 +689,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ @Override public void start() throws MQClientException { + setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { @@ -690,7 +750,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ @Override public void subscribe(String topic, String subExpression) throws MQClientException { - this.defaultMQPushConsumerImpl.subscribe(topic, subExpression); + this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression); } /** @@ -702,7 +762,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ @Override public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { - this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); + this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), fullClassName, filterClassSource); } /** @@ -715,7 +775,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ @Override public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException { - this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector); + this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), messageSelector); } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java index 28381b064e7b59c53554f0f8eef3ff008d3695a3..835852e9e32cf20164ff4b8209fe97f627a5f48f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java @@ -29,6 +29,7 @@ public class ConsumeMessageContext { private String status; private Object mqTraceContext; private Map props; + private String namespace; public String getConsumerGroup() { return consumerGroup; @@ -85,4 +86,12 @@ public class ConsumeMessageContext { public void setStatus(String status) { this.status = status; } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java index 00723fdff8b73a4362c5f68d296ff3b9cb701c3e..e970d81fc8ef6c91f3d2bacaad0af81f4bdf8168 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java @@ -37,6 +37,7 @@ public class SendMessageContext { private Map props; private DefaultMQProducerImpl producer; private MessageType msgType = MessageType.Normal_Msg; + private String namespace; public MessageType getMsgType() { return msgType; @@ -133,4 +134,12 @@ public class SendMessageContext { public void setBornHost(String bornHost) { this.bornHost = bornHost; } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index fe0db96c907283dd90064ae8a237b6d102db2837..0bd810a1e1db1b4df45e091052e2aac7394a2130 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -16,19 +16,21 @@ */ package org.apache.rocketmq.client.impl; -import io.netty.channel.ChannelHandlerContext; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.MQProducerInner; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; @@ -41,6 +43,7 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestH import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; @@ -91,6 +94,10 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); final MessageExt messageExt = MessageDecoder.decode(byteBuffer); if (messageExt != null) { + if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) { + messageExt.setTopic(NamespaceUtil + .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace())); + } String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { messageExt.setTransactionId(transactionId); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 3d2df0f5dd1535e6374fa1c921b9b956da3cdf60..ca89d613060b9b1aa4cb818760b4f9767b5f68ef 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.impl; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -35,6 +36,7 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; @@ -136,7 +138,7 @@ public class MQAdminImpl { if (topicRouteData != null) { TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData); if (topicPublishInfo != null && topicPublishInfo.ok()) { - return topicPublishInfo.getMessageQueueList(); + return parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); } } } catch (Exception e) { @@ -146,6 +148,16 @@ public class MQAdminImpl { throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); } + public List parsePublishMessageQueues(List messageQueueList) { + List resultQueues = new ArrayList(); + for (MessageQueue queue : messageQueueList) { + String userTopic = NamespaceUtil.withoutNamespace(queue.getTopic(), this.mQClientFactory.getClientConfig().getNamespace()); + resultQueues.add(new MessageQueue(userTopic, queue.getBrokerName(), queue.getQueueId())); + } + + return resultQueues; + } + public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { try { TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); @@ -407,6 +419,13 @@ public class MQAdminImpl { } } + //If namespace not null , reset Topic without namespace. + for (MessageExt messageExt : messageList) { + if (null != this.mQClientFactory.getClientConfig().getNamespace()) { + messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mQClientFactory.getClientConfig().getNamespace())); + } + } + if (!messageList.isEmpty()) { return new QueryResult(indexLastUpdateTimestamp, messageList); } else { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 1837204a3090802c47fad5dff5b0af05c3915212..9048ab8545c825487cec0c3b5613d45e90bbbed0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -56,6 +58,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.namesrv.TopAddressing; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; @@ -522,7 +525,13 @@ public class MQClientAPIImpl { SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); - MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId()); + //If namespace not null , reset Topic without namespace. + String topic = msg.getTopic(); + if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) { + topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()); + } + + MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId()); String uniqMsgId = MessageClientIDSetter.getUniqID(msg); if (msg instanceof MessageBatch) { @@ -665,6 +674,10 @@ public class MQClientAPIImpl { case ResponseCode.SUCCESS: { ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody()); MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true); + //If namespace not null , reset Topic without namespace. + if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) { + messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.clientConfig.getNamespace())); + } return messageExt; } default: diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index 58b985f10111e66a6cb4a3b14a5830162a0f0fe8..258e4dbf8777fd6c5d1f0aa0cb947da332c6064f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -29,6 +29,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -39,13 +40,12 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.stat.ConsumerStatsManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.CMResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { @@ -157,7 +157,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq); - this.resetRetryTopic(msgs); + this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup); final long beginTime = System.currentTimeMillis(); @@ -236,15 +236,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } } - public void resetRetryTopic(final List msgs) { - final String groupTopic = MixAll.getRetryTopic(consumerGroup); - for (MessageExt msg : msgs) { - String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); - if (retryTopic != null && groupTopic.equals(msg.getTopic())) { - msg.setTopic(retryTopic); - } - } - } private void cleanExpireMsg() { Iterator> it = @@ -326,6 +317,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { int delayLevel = context.getDelayLevelWhenNextConsume(); + // Wrap topic with namespace before sending back message. + msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic())); try { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true; @@ -392,10 +385,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; + defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); + consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap()); consumeMessageContext.setMq(messageQueue); @@ -408,7 +403,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { - ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index d4a9953e783f06e16b1d07a5f7b18bea8dbfaafe..edc2647a5f1d9c32503910dd5cf2aeeb7588deff 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -26,6 +26,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; @@ -37,6 +39,7 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; @@ -142,6 +145,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq); + this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup); + final long beginTime = System.currentTimeMillis(); log.info("consumeMessageDirectly receive new message: {}", msg); @@ -380,6 +385,14 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { return false; } + public void resetNamespace(final List msgs) { + for (MessageExt msg : msgs) { + if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); + } + } + } + class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; private final MessageQueue messageQueue; @@ -439,6 +452,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List msgs = this.processQueue.takeMessags(consumeBatchSize); + defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); @@ -449,6 +463,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); + consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 39c43d592d7772d335e166fa88af6a8322fd9438..8aff14b748456f49b0fb653804950807a45ccde3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; @@ -125,7 +126,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } } - return mqResult; + return parseSubscribeMessageQueues(mqResult); } public List fetchPublishMessageQueues(String topic) throws MQClientException { @@ -135,7 +136,23 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); + // check if has info in memory, otherwise invoke api. + Set result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); + if (null == result) { + result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); + } + + return parseSubscribeMessageQueues(result); + } + + public Set parseSubscribeMessageQueues(Set queueSet) { + Set resultQueues = new HashSet(); + for (MessageQueue messageQueue : queueSet) { + String userTopic = NamespaceUtil.withoutNamespace(messageQueue.getTopic(), + this.defaultMQPullConsumer.getNamespace()); + resultQueues.add(new MessageQueue(userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId())); + } + return resultQueues; } public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { @@ -244,9 +261,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { null ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); + //If namespace not null , reset Topic without namespace. + this.resetTopic(pullResult.getMsgFoundList()); if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); + consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); @@ -259,6 +279,20 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { return pullResult; } + public void resetTopic(List msgList) { + if (null == msgList || msgList.size() == 0) { + return; + } + + //If namespace not null , reset Topic without namespace. + for (MessageExt messageExt : msgList) { + if (null != this.getDefaultMQPullConsumer().getNamespace()) { + messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.defaultMQPullConsumer.getNamespace())); + } + } + + } + public void subscriptionAutomatically(final String topic) { if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { try { @@ -474,8 +508,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { @Override public void onSuccess(PullResult pullResult) { - pullCallback - .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData)); + PullResult userPullResult = DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); + resetTopic(userPullResult.getMsgFoundList()); + pullCallback.onSuccess(userPullResult); } @Override @@ -558,6 +593,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); + } finally { + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPullConsumer.getNamespace())); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 393ef92c9961c5d628b9a19c17017f4061501244..48fe41ada6d6c376cec49d589b2cf6881e72f2c7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -27,6 +27,8 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -56,6 +58,7 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; @@ -169,7 +172,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { throw new MQClientException("The topic[" + topic + "] not exist", null); } - return result; + return parseSubscribeMessageQueues(result); + } + + public Set parseSubscribeMessageQueues(Set messageQueueList) { + Set resultQueues = new HashSet(); + for (MessageQueue queue : messageQueueList) { + String userTopic = NamespaceUtil.withoutNamespace(queue.getTopic(), this.defaultMQPushConsumer.getNamespace()); + resultQueues.add(new MessageQueue(userTopic, queue.getBrokerName(), queue.getQueueId())); + } + + return resultQueues; } public DefaultMQPushConsumer getDefaultMQPushConsumer() { @@ -517,6 +530,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); + } finally { + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } } @@ -1131,6 +1146,20 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { return queueTimeSpan; } + public void resetRetryAndNamespace(final List msgs, String consumerGroup) { + final String groupTopic = MixAll.getRetryTopic(consumerGroup); + for (MessageExt msg : msgs) { + String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (retryTopic != null && groupTopic.equals(msg.getTopic())) { + msg.setTopic(retryTopic); + } + + if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); + } + } + } + public ConsumeMessageService getConsumeMessageService() { return consumeMessageService; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 57d2eda18697c7347d53362500eff59985ee9273..2a4fb7dfa6cedc44079425b5c455b85db56e9e1c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -63,6 +65,7 @@ import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -362,6 +365,26 @@ public class MQClientInstance { } } + /** + * + * @param offsetTable + * @param namespace + * @return newOffsetTable + */ + public Map parseOffsetTableFromBroker(Map offsetTable, String namespace) { + HashMap newOffsetTable = new HashMap(); + if (StringUtils.isNotEmpty(namespace)) { + for (Entry entry : offsetTable.entrySet()) { + MessageQueue queue = entry.getKey(); + queue.setTopic(NamespaceUtil.withoutNamespace(queue.getTopic(), namespace)); + newOffsetTable.put(queue, entry.getValue()); + } + } else { + newOffsetTable.putAll(offsetTable); + } + + return newOffsetTable; + } /** * Remove offline broker */ @@ -1220,4 +1243,8 @@ public class MQClientInstance { public NettyClientConfig getNettyClientConfig() { return nettyClientConfig; } + + public ClientConfig getClientConfig() { + return clientConfig; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 6b8fdbd73d4d0f918b2a61492cad449616ccd912..62aaef3b13ff9996c65a712867120dc4321360e5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -73,6 +73,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; @@ -543,6 +544,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); + if (times > 0) { + //Reset topic with namespace during resend. + msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); + } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; @@ -699,6 +704,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { MessageClientIDSetter.setUniqID(msg); } + boolean topicWithNamespace = false; + if (null != this.mQClientFactory.getClientConfig().getNamespace()) { + msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); + topicWithNamespace = true; + } + int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { @@ -732,6 +743,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); + context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); @@ -774,13 +786,24 @@ public class DefaultMQProducerImpl implements MQProducerInner { switch (communicationMode) { case ASYNC: Message tmpMessage = msg; + boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); + messageCloned = true; msg.setBody(prevBody); } + + if (topicWithNamespace) { + if (!messageCloned) { + tmpMessage = MessageAccessor.cloneMessage(msg); + messageCloned = true; + } + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); + } + long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); @@ -846,6 +869,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { throw e; } finally { msg.setBody(prevBody); + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } @@ -1059,7 +1083,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { - mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); + List messageQueueList = + mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); + Message userMessage = MessageAccessor.cloneMessage(msg); + String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); + userMessage.setTopic(userTopic); + + mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } @@ -1323,4 +1353,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable); } + + public DefaultMQProducer getDefaultMQProducer() { + return defaultMQProducer; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index d1ccb30fb932bdf72b375264ec45e527c941f6ed..3a26e832497b371be0318d133bf03a576eacd747 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -134,22 +134,29 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Default constructor. */ public DefaultMQProducer() { - this(MixAll.DEFAULT_PRODUCER_GROUP, null); + this(null, MixAll.DEFAULT_PRODUCER_GROUP, null); } /** - * Constructor specifying both producer group and RPC hook. + * Constructor specifying the RPC hook. * - * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. */ - public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { - this.producerGroup = producerGroup; - defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + public DefaultMQProducer(RPCHook rpcHook) { + this(null, MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); + } + + /** + * Constructor specifying producer group. + * + * @param producerGroup Producer group, see the name-sake field. + */ + public DefaultMQProducer(final String producerGroup) { + this(null, producerGroup, null); } /** - * Constructor specifying producer group, RPC hook, enabled msgTrace flag and customized trace topic name. + * Constructor specifying producer group. * * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. @@ -178,10 +185,34 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Constructor specifying producer group. * + * @param namespace Namespace for this MQ Producer instance. * @param producerGroup Producer group, see the name-sake field. */ - public DefaultMQProducer(final String producerGroup) { - this(producerGroup, null); + public DefaultMQProducer(final String namespace, final String producerGroup) { + this(namespace, producerGroup, null); + } + + /** + * Constructor specifying both producer group and RPC hook. + * + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + */ + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { + this(null, producerGroup, rpcHook); + } + + /** + * Constructor specifying namespace, producer group and RPC hook. + * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + */ + public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { + this.namespace = namespace; + this.producerGroup = producerGroup; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); } /** @@ -191,7 +222,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @param enableMsgTrace Switch flag instance for message trace. */ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) { - this(producerGroup, null, enableMsgTrace, null); + this(null, producerGroup, null, enableMsgTrace, null); } /** @@ -203,16 +234,34 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * trace topic name. */ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { - this(producerGroup, null, enableMsgTrace, customizedTraceTopic); + this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic); } /** - * Constructor specifying the RPC hook. + * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic name. * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. */ - public DefaultMQProducer(RPCHook rpcHook) { - this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); + public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { + this.namespace = namespace; + this.producerGroup = producerGroup; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + //if client open the message trace feature + if (enableMsgTrace) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); + traceDispatcher = dispatcher; + this.getDefaultMQProducerImpl().registerSendMessageHook( + new SendMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } } /** @@ -229,6 +278,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public void start() throws MQClientException { + this.setProducerGroup(withNamespace(this.producerGroup)); this.defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { @@ -259,7 +309,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public List fetchPublishMessageQueues(String topic) throws MQClientException { - return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic); + return this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic)); } /** @@ -281,6 +331,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg); } @@ -299,6 +350,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg, timeout); } @@ -322,6 +374,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, sendCallback); } @@ -338,6 +391,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, sendCallback, timeout); } @@ -352,6 +406,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.sendOneway(msg); } @@ -370,7 +425,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQProducerImpl.send(msg, mq); + msg.setTopic(withNamespace(msg.getTopic())); + return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq)); } /** @@ -389,7 +445,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public SendResult send(Message msg, MessageQueue mq, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQProducerImpl.send(msg, mq, timeout); + msg.setTopic(withNamespace(msg.getTopic())); + return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), timeout); } /** @@ -405,7 +462,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.send(msg, mq, sendCallback); + msg.setTopic(withNamespace(msg.getTopic())); + this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback); } /** @@ -422,7 +480,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); + msg.setTopic(withNamespace(msg.getTopic())); + this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback, timeout); } /** @@ -437,7 +496,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.sendOneway(msg, mq); + msg.setTopic(withNamespace(msg.getTopic())); + this.defaultMQProducerImpl.sendOneway(msg, queueWithNamespace(mq)); } /** @@ -456,6 +516,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg, selector, arg); } @@ -476,6 +537,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg, selector, arg, timeout); } @@ -493,6 +555,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); } @@ -511,6 +574,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } @@ -527,6 +591,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.sendOneway(msg, selector, arg); } @@ -571,7 +636,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); + createTopic(key, withNamespace(newTopic), queueNum, 0); } /** @@ -587,7 +652,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag); } /** @@ -600,7 +665,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.defaultMQProducerImpl.searchOffset(mq, timestamp); + return this.defaultMQProducerImpl.searchOffset(queueWithNamespace(mq), timestamp); } /** @@ -615,7 +680,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Deprecated @Override public long maxOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQProducerImpl.maxOffset(mq); + return this.defaultMQProducerImpl.maxOffset(queueWithNamespace(mq)); } /** @@ -630,7 +695,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Deprecated @Override public long minOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQProducerImpl.minOffset(mq); + return this.defaultMQProducerImpl.minOffset(queueWithNamespace(mq)); } /** @@ -645,7 +710,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Deprecated @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - return this.defaultMQProducerImpl.earliestMsgStoreTime(mq); + return this.defaultMQProducerImpl.earliestMsgStoreTime(queueWithNamespace(mq)); } /** @@ -685,7 +750,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { - return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end); + return this.defaultMQProducerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end); } /** @@ -710,7 +775,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return this.viewMessage(msgId); } catch (Exception e) { } - return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId); + return this.defaultMQProducerImpl.queryMessageByUniqKey(withNamespace(topic), msgId); } @Override @@ -764,11 +829,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); + message.setTopic(withNamespace(message.getTopic())); } msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch", e); } + msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index 8f6428b29c2ae700547d0b082a8d13d608e35bd1..9873aca89dc637c10bab7c03cf254cae6fa103c3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.client.producer; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.remoting.RPCHook; public class TransactionMQProducer extends DefaultMQProducer { @@ -35,11 +36,19 @@ public class TransactionMQProducer extends DefaultMQProducer { } public TransactionMQProducer(final String producerGroup) { - super(producerGroup); + this(null, producerGroup, null); + } + + public TransactionMQProducer(final String namespace, final String producerGroup) { + this(namespace, producerGroup, null); } public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { - super(producerGroup, rpcHook); + this(null, producerGroup, rpcHook); + } + + public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { + super(namespace, producerGroup, rpcHook); } @Override @@ -66,6 +75,7 @@ public class TransactionMQProducer extends DefaultMQProducer { throw new MQClientException("localTransactionBranchCheckListener is null", null); } + msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); } diff --git a/common/pom.xml b/common/pom.xml index 1cd4043e2123df6b39cd4d134471380b465541af..dbbdf5e48319c43e45f6ddea0cf66e49948fc7f8 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -37,5 +37,9 @@ ${project.groupId} rocketmq-remoting + + org.apache.commons + commons-lang3 + diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java index 287be13088f1d3122f7ced56729fe9389f797131..c9a133b4d0cfd4969b936fac7889b0788bc89edb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java @@ -161,6 +161,10 @@ public class Message implements Serializable { this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK)); } + public void setInstanceId(String instanceId) { + this.putProperty(MessageConst.PROPERTY_INSTANCE_ID, instanceId); + } + public int getFlag() { return flag; } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index bf5c807c7f3ec1c646bdd83e687316aa6c783b3b..aa8481643a88252731b195c68f3c6c7337dd1f5a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -44,6 +44,7 @@ public class MessageConst { public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET"; public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES"; public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS"; + public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID"; public static final String KEY_SEPARATOR = " "; @@ -72,5 +73,6 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES); STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP); + STRING_HASH_SET.add(PROPERTY_INSTANCE_ID); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..afd537612267ca04068e3338d8e67ea98fb3835b --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.MixAll; + +public class NamespaceUtil { + public static final char NAMESPACE_SEPARATOR = '%'; + public static final String STRING_BLANK = ""; + public static final int RETRY_PREFIX_LENGTH = MixAll.RETRY_GROUP_TOPIC_PREFIX.length(); + public static final int DLQ_PREFIX_LENGTH = MixAll.DLQ_GROUP_TOPIC_PREFIX.length(); + + /** + * Unpack namespace from resource, just like: + * (1) MQ_INST_XX%Topic_XXX --> Topic_XXX + * (2) %RETRY%MQ_INST_XX%GID_XXX --> %RETRY%GID_XXX + * + * @param resourceWithNamespace, topic/groupId with namespace. + * @return topic/groupId without namespace. + */ + public static String withoutNamespace(String resourceWithNamespace) { + if (StringUtils.isEmpty(resourceWithNamespace) || isSystemResource(resourceWithNamespace)) { + return resourceWithNamespace; + } + + StringBuffer strBuffer = new StringBuffer(); + if (isRetryTopic(resourceWithNamespace)) { + strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX); + } + if (isDLQTopic(resourceWithNamespace)) { + strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX); + } + + String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithNamespace); + int index = resourceWithoutRetryAndDLQ.indexOf(NAMESPACE_SEPARATOR); + if (index > 0) { + String resourceWithoutNamespace = resourceWithoutRetryAndDLQ.substring(index + 1); + return strBuffer.append(resourceWithoutNamespace).toString(); + } + + return resourceWithNamespace; + } + + /** + * If resource contains the namespace, unpack namespace from resource, just like: + * (1) (MQ_INST_XX1%Topic_XXX1, MQ_INST_XX1) --> Topic_XXX1 + * (2) (MQ_INST_XX2%Topic_XXX2, NULL) --> MQ_INST_XX2%Topic_XXX2 + * (3) (%RETRY%MQ_INST_XX1%GID_XXX1, MQ_INST_XX1) --> %RETRY%GID_XXX1 + * (4) (%RETRY%MQ_INST_XX2%GID_XXX2, MQ_INST_XX3) --> %RETRY%MQ_INST_XX2%GID_XXX2 + * + * @param resourceWithNamespace, topic/groupId with namespace. + * @param namespace, namespace to be unpacked. + * @return topic/groupId without namespace. + */ + public static String withoutNamespace(String resourceWithNamespace, String namespace) { + if (StringUtils.isEmpty(resourceWithNamespace) || StringUtils.isEmpty(namespace)) { + return resourceWithNamespace; + } + + String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithNamespace); + if (resourceWithoutRetryAndDLQ.startsWith(namespace + NAMESPACE_SEPARATOR)) { + return withoutNamespace(resourceWithNamespace); + } + + return resourceWithNamespace; + } + + public static String wrapNamespace(String namespace, String resourceWithOutNamespace) { + if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) { + return resourceWithOutNamespace; + } + + if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) { + return resourceWithOutNamespace; + } + + String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace); + StringBuffer strBuffer = new StringBuffer(); + + if (isRetryTopic(resourceWithOutNamespace)) { + strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX); + } + + if (isDLQTopic(resourceWithOutNamespace)) { + strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX); + } + + return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString(); + + } + + public static boolean isAlreadyWithNamespace(String resource, String namespace) { + if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resource) || isSystemResource(resource)) { + return false; + } + + String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resource); + + return resourceWithoutRetryAndDLQ.startsWith(namespace + NAMESPACE_SEPARATOR); + } + + public static String wrapNamespaceAndRetry(String namespace, String consumerGroup) { + if (StringUtils.isEmpty(consumerGroup)) { + return null; + } + + return new StringBuffer() + .append(MixAll.RETRY_GROUP_TOPIC_PREFIX) + .append(wrapNamespace(namespace, consumerGroup)) + .toString(); + } + + public static String getNamespaceFromResource(String resource) { + if (StringUtils.isEmpty(resource) || isSystemResource(resource)) { + return STRING_BLANK; + } + String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resource); + int index = resourceWithoutRetryAndDLQ.indexOf(NAMESPACE_SEPARATOR); + + return index > 0 ? resourceWithoutRetryAndDLQ.substring(0, index) : STRING_BLANK; + } + + private static String withOutRetryAndDLQ(String originalResource) { + if (StringUtils.isEmpty(originalResource)) { + return STRING_BLANK; + } + if (isRetryTopic(originalResource)) { + return originalResource.substring(RETRY_PREFIX_LENGTH); + } + + if (isDLQTopic(originalResource)) { + return originalResource.substring(DLQ_PREFIX_LENGTH); + } + + return originalResource; + } + + private static boolean isSystemResource(String resource) { + if (StringUtils.isEmpty(resource)) { + return false; + } + + if (MixAll.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) { + return true; + } + + return MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC.equals(resource); + } + + public static boolean isRetryTopic(String resource) { + return StringUtils.isNotBlank(resource) && resource.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + } + + public static boolean isDLQTopic(String resource) { + return StringUtils.isNotBlank(resource) && resource.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX); + } +} \ No newline at end of file diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/NamespaceUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/NamespaceUtilTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1120d9ed3d54955e25f0e41bc10770ff2b806e6b --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/protocol/NamespaceUtilTest.java @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2010-2016 Alibaba Group Holding Limited + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol; + +import org.apache.rocketmq.common.MixAll; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author MQDevelopers + */ +public class NamespaceUtilTest { + + private static final String INSTANCE_ID = "MQ_INST_XXX"; + private static final String INSTANCE_ID_WRONG = "MQ_INST_XXX1"; + private static final String TOPIC = "TOPIC_XXX"; + private static final String GROUP_ID = "GID_XXX"; + private static final String SYSTEM_TOPIC = "rmq_sys_topic"; + private static final String GROUP_ID_WITH_NAMESPACE = INSTANCE_ID + NamespaceUtil.NAMESPACE_SEPARATOR + GROUP_ID; + private static final String TOPIC_WITH_NAMESPACE = INSTANCE_ID + NamespaceUtil.NAMESPACE_SEPARATOR + TOPIC; + private static final String RETRY_TOPIC = MixAll.RETRY_GROUP_TOPIC_PREFIX + GROUP_ID; + private static final String RETRY_TOPIC_WITH_NAMESPACE = + MixAll.RETRY_GROUP_TOPIC_PREFIX + INSTANCE_ID + NamespaceUtil.NAMESPACE_SEPARATOR + GROUP_ID; + private static final String DLQ_TOPIC = MixAll.DLQ_GROUP_TOPIC_PREFIX + GROUP_ID; + private static final String DLQ_TOPIC_WITH_NAMESPACE = + MixAll.DLQ_GROUP_TOPIC_PREFIX + INSTANCE_ID + NamespaceUtil.NAMESPACE_SEPARATOR + GROUP_ID; + + @Test + public void testWithoutNamespace() { + String topic = NamespaceUtil.withoutNamespace(TOPIC_WITH_NAMESPACE, INSTANCE_ID); + Assert.assertEquals(topic, TOPIC); + String topic1 = NamespaceUtil.withoutNamespace(TOPIC_WITH_NAMESPACE); + Assert.assertEquals(topic1, TOPIC); + String groupId = NamespaceUtil.withoutNamespace(GROUP_ID_WITH_NAMESPACE, INSTANCE_ID); + Assert.assertEquals(groupId, GROUP_ID); + String groupId1 = NamespaceUtil.withoutNamespace(GROUP_ID_WITH_NAMESPACE); + Assert.assertEquals(groupId1, GROUP_ID); + String consumerId = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE, INSTANCE_ID); + Assert.assertEquals(consumerId, RETRY_TOPIC); + String consumerId1 = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE); + Assert.assertEquals(consumerId1, RETRY_TOPIC); + String consumerId2 = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE, INSTANCE_ID_WRONG); + Assert.assertEquals(consumerId2, RETRY_TOPIC_WITH_NAMESPACE); + Assert.assertNotEquals(consumerId2, RETRY_TOPIC); + } + + @Test + public void testWrapNamespace() { + String topic1 = NamespaceUtil.wrapNamespace(INSTANCE_ID, TOPIC); + Assert.assertEquals(topic1, TOPIC_WITH_NAMESPACE); + String topicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, topic1); + Assert.assertEquals(topicWithNamespaceAgain, TOPIC_WITH_NAMESPACE); + //Wrap retry topic + String retryTopicWithNamespace = NamespaceUtil.wrapNamespace(INSTANCE_ID, RETRY_TOPIC); + Assert.assertEquals(retryTopicWithNamespace, RETRY_TOPIC_WITH_NAMESPACE); + String retryTopicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, retryTopicWithNamespace); + Assert.assertEquals(retryTopicWithNamespaceAgain, retryTopicWithNamespace); + //Wrap DLQ topic + String dlqTopicWithNamespace = NamespaceUtil.wrapNamespace(INSTANCE_ID, DLQ_TOPIC); + Assert.assertEquals(dlqTopicWithNamespace, DLQ_TOPIC_WITH_NAMESPACE); + String dlqTopicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, dlqTopicWithNamespace); + Assert.assertEquals(dlqTopicWithNamespaceAgain, dlqTopicWithNamespace); + Assert.assertEquals(dlqTopicWithNamespaceAgain, DLQ_TOPIC_WITH_NAMESPACE ); + //test system topic + String systemTopic = NamespaceUtil.wrapNamespace(INSTANCE_ID, SYSTEM_TOPIC); + Assert.assertEquals(systemTopic, SYSTEM_TOPIC); + } + + @Test + public void testGetNamespaceFromResource(){ + String namespaceExpectBlank = NamespaceUtil.getNamespaceFromResource(TOPIC); + Assert.assertEquals(namespaceExpectBlank, NamespaceUtil.STRING_BLANK); + String namespace = NamespaceUtil.getNamespaceFromResource(TOPIC_WITH_NAMESPACE); + Assert.assertEquals(namespace, INSTANCE_ID); + String namespaceFromRetryTopic = NamespaceUtil.getNamespaceFromResource(RETRY_TOPIC_WITH_NAMESPACE); + Assert.assertEquals(namespaceFromRetryTopic, INSTANCE_ID); + } +} \ No newline at end of file diff --git a/example/src/main/java/org/apache/rocketmq/example/namespace/ProducerWithNamespace.java b/example/src/main/java/org/apache/rocketmq/example/namespace/ProducerWithNamespace.java new file mode 100644 index 0000000000000000000000000000000000000000..ed47c96facea7153da1471c138bef7589c39a098 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/namespace/ProducerWithNamespace.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.namespace; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; + +public class ProducerWithNamespace { + public static void main(String[] args) throws Exception { + + DefaultMQProducer producer = new DefaultMQProducer("InstanceTest", "pidTest"); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + for (int i = 0; i < 100; i++) { + Message message = new Message("topicTest", "tagTest", "Hello world".getBytes()); + try { + SendResult result = producer.send(message); + System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file diff --git a/example/src/main/java/org/apache/rocketmq/example/namespace/PullConsumerWithNamespace.java b/example/src/main/java/org/apache/rocketmq/example/namespace/PullConsumerWithNamespace.java new file mode 100644 index 0000000000000000000000000000000000000000..f8bf0230d5670105e63e59e0f7b5fc6362cc2070 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/namespace/PullConsumerWithNamespace.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.namespace; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.common.message.MessageQueue; + +public class PullConsumerWithNamespace { + private static final Map OFFSE_TABLE = new HashMap(); + + public static void main(String[] args) throws Exception { + DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("InstanceTest", "cidTest"); + pullConsumer.setNamesrvAddr("127.0.0.1:9876"); + pullConsumer.start(); + + Set mqs = pullConsumer.fetchSubscribeMessageQueues("topicTest"); + for (MessageQueue mq : mqs) { + System.out.printf("Consume from the topic: %s, queue: %s%n", mq.getTopic(), mq); + SINGLE_MQ: + while (true) { + try { + PullResult pullResult = + pullConsumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); + System.out.printf("%s%n", pullResult); + + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + switch (pullResult.getPullStatus()) { + case FOUND: + dealWithPullResult(pullResult); + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break SINGLE_MQ; + case OFFSET_ILLEGAL: + break; + default: + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + pullConsumer.shutdown(); + } + + private static long getMessageQueueOffset(MessageQueue mq) { + Long offset = OFFSE_TABLE.get(mq); + if (offset != null) { + return offset; + } + + return 0; + } + + private static void dealWithPullResult(PullResult pullResult) { + if (null == pullResult || pullResult.getMsgFoundList().isEmpty()) { + return; + } + pullResult.getMsgFoundList().stream().forEach( + (msg) -> System.out.printf("Topic is:%s, msgId is:%s%n" , msg.getTopic(), msg.getMsgId())); + } + + private static void putMessageQueueOffset(MessageQueue mq, long offset) { + OFFSE_TABLE.put(mq, offset); + } +} \ No newline at end of file diff --git a/example/src/main/java/org/apache/rocketmq/example/namespace/PushConsumerWithNamespace.java b/example/src/main/java/org/apache/rocketmq/example/namespace/PushConsumerWithNamespace.java new file mode 100644 index 0000000000000000000000000000000000000000..a0fdeeffbba784062c5049737389c045c7bc9bfc --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/namespace/PushConsumerWithNamespace.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.namespace; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; + +public class PushConsumerWithNamespace { + public static void main(String[] args) throws Exception { + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("InstanceTest", "cidTest"); + defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876"); + defaultMQPushConsumer.subscribe("topicTest", "*"); + defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> { + msgs.stream().forEach((msg) -> { + System.out.printf("Msg topic is:%s, MsgId is:%s, reconsumeTimes is:%s%n", msg.getTopic() , msg.getMsgId(), msg.getReconsumeTimes()); + }); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + }); + + defaultMQPushConsumer.start(); + } +} \ No newline at end of file