未验证 提交 9e05be69 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1122 from lollipopjin/RocketMQ-1120

[ISSUE #1120]Add new feature: support namespace
......@@ -35,6 +35,7 @@ public class ConsumeMessageContext {
private BrokerStatsManager.StatsType commercialRcvStats;
private int commercialRcvTimes;
private int commercialRcvSize;
private String namespace;
public String getConsumerGroup() {
return consumerGroup;
......@@ -147,4 +148,12 @@ public class ConsumeMessageContext {
public void setCommercialRcvSize(final int commercialRcvSize) {
this.commercialRcvSize = commercialRcvSize;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
......@@ -40,11 +40,12 @@ public class SendMessageContext {
private long bornTimeStamp;
private MessageType msgType = MessageType.Trans_msg_Commit;
private boolean isSuccess = false;
//For Commercial
private String commercialOwner;
private BrokerStatsManager.StatsType commercialSendStats;
private int commercialSendSize;
private int commercialSendTimes;
private String namespace;
public boolean isSuccess() {
return isSuccess;
......@@ -229,4 +230,12 @@ public class SendMessageContext {
public void setCommercialSendTimes(final int commercialSendTimes) {
this.commercialSendTimes = commercialSendTimes;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
......@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.broker.processor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
......@@ -27,11 +33,10 @@ import org.apache.rocketmq.common.constant.DBMsgConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
......@@ -40,18 +45,14 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.utils.ChannelUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......@@ -73,9 +74,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
if (!this.hasSendMessageHook()) {
return null;
}
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
SendMessageContext mqtraceContext;
mqtraceContext = new SendMessageContext();
mqtraceContext.setProducerGroup(requestHeader.getProducerGroup());
mqtraceContext.setNamespace(namespace);
mqtraceContext.setTopic(requestHeader.getTopic());
mqtraceContext.setMsgProps(requestHeader.getProperties());
mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
......@@ -253,7 +256,9 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
try {
final SendMessageRequestHeader requestHeader = parseRequestHeader(request);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
if (null != requestHeader) {
context.setNamespace(namespace);
context.setProducerGroup(requestHeader.getProducerGroup());
context.setTopic(requestHeader.getTopic());
context.setBodyLength(request.getBody().length);
......
......@@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
......@@ -33,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
......@@ -49,10 +54,6 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
......@@ -101,9 +102,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
......
......@@ -16,8 +16,14 @@
*/
package org.apache.rocketmq.client;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
......@@ -31,6 +37,7 @@ public class ClientConfig {
private String clientIP = RemotingUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace;
/**
* Pulling topic information interval from the named server
*/
......@@ -87,6 +94,38 @@ public class ClientConfig {
}
}
public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}
public Set<String> withNamespace(Set<String> resourceSet) {
Set<String> resourceWithNamespace = new HashSet<String>();
for (String resource : resourceSet) {
resourceWithNamespace.add(withNamespace(resource));
}
return resourceWithNamespace;
}
public String withoutNamespace(String resource) {
return NamespaceUtil.withoutNamespace(resource, this.getNamespace());
}
public Set<String> withoutNamespace(Set<String> resourceSet) {
Set<String> resourceWithoutNamespace = new HashSet<String>();
for (String resource : resourceSet) {
resourceWithoutNamespace.add(withoutNamespace(resource));
}
return resourceWithoutNamespace;
}
public MessageQueue queueWithNamespace(MessageQueue queue) {
if (StringUtils.isEmpty(this.getNamespace())) {
return queue;
}
return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
}
public void resetClientConfig(final ClientConfig cc) {
this.namesrvAddr = cc.namesrvAddr;
this.clientIP = cc.clientIP;
......@@ -99,6 +138,7 @@ public class ClientConfig {
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
this.useTLS = cc.useTLS;
this.namespace = cc.namespace;
this.language = cc.language;
}
......@@ -115,6 +155,7 @@ public class ClientConfig {
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
cc.useTLS = useTLS;
cc.namespace = namespace;
cc.language = language;
return cc;
}
......@@ -199,12 +240,20 @@ public class ClientConfig {
this.language = language;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
}
}
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -84,20 +85,34 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
private int maxReconsumeTimes = 16;
public DefaultMQPullConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null);
}
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
}
public DefaultMQPullConsumer(final String consumerGroup) {
this(consumerGroup, null);
this(null, consumerGroup, null);
}
public DefaultMQPullConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
}
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
this(null, consumerGroup, rpcHook);
}
public DefaultMQPullConsumer(final String namespace, final String consumerGroup) {
this(namespace, consumerGroup, null);
}
/**
* Constructor specifying namespace, consumer group and RPC hook.
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
}
/**
......@@ -106,7 +121,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
/**
......@@ -115,7 +130,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
/**
......@@ -124,7 +139,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Deprecated
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
return this.defaultMQPullConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
}
/**
......@@ -133,7 +148,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.maxOffset(mq);
return this.defaultMQPullConsumerImpl.maxOffset(queueWithNamespace(mq));
}
/**
......@@ -142,7 +157,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.minOffset(mq);
return this.defaultMQPullConsumerImpl.minOffset(queueWithNamespace(mq));
}
/**
......@@ -151,7 +166,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq);
return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
}
/**
......@@ -171,7 +186,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
return this.defaultMQPullConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
}
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
......@@ -239,7 +254,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
}
public void setRegisterTopics(Set<String> registerTopics) {
this.registerTopics = registerTopics;
this.registerTopics = withNamespace(registerTopics);
}
/**
......@@ -250,6 +265,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
......@@ -261,16 +277,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
}
@Override
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic);
return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
}
@Override
public void start() throws MQClientException {
this.setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPullConsumerImpl.start();
}
......@@ -282,7 +300,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Override
public void registerMessageQueueListener(String topic, MessageQueueListener listener) {
synchronized (this.registerTopics) {
this.registerTopics.add(topic);
this.registerTopics.add(withNamespace(topic));
if (listener != null) {
this.messageQueueListener = listener;
}
......@@ -292,80 +310,80 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Override
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums);
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums);
}
@Override
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout);
}
@Override
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums);
}
@Override
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout);
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, timeout);
}
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback);
}
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout);
}
@Override
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, pullCallback);
}
@Override
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout);
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, pullCallback, timeout);
}
@Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums);
return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums);
}
@Override
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback);
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback);
}
@Override
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset);
this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
}
@Override
public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, fromStore);
return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore);
}
@Override
public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException {
return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic);
return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(withNamespace(topic));
}
@Override
......@@ -377,12 +395,13 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
} catch (Exception e) {
// Ignore
}
return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, uniqKey);
return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(withNamespace(topic), uniqKey);
}
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup);
}
......
......@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -262,7 +263,47 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Default constructor.
*/
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying namespace and consumer group.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consumer group.
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup) {
this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying namespace, consumer group and RPC hook .
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this(namespace, consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
}
/**
......@@ -274,48 +315,25 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy);
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
* Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
......@@ -323,7 +341,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
}
/**
......@@ -334,16 +352,51 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying consumer group.
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consumer group.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy, enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying namespace, consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
......@@ -352,7 +405,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
/**
......@@ -361,7 +414,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
/**
......@@ -370,7 +423,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Deprecated
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
return this.defaultMQPushConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
}
/**
......@@ -379,7 +432,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(mq);
return this.defaultMQPushConsumerImpl.maxOffset(queueWithNamespace(mq));
}
/**
......@@ -388,7 +441,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.minOffset(mq);
return this.defaultMQPushConsumerImpl.minOffset(queueWithNamespace(mq));
}
/**
......@@ -397,7 +450,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
}
/**
......@@ -417,7 +470,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
return this.defaultMQPushConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
}
/**
......@@ -433,7 +486,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
} catch (Exception e) {
// Ignore
}
return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId);
return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(withNamespace(topic), msgId);
}
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
......@@ -573,7 +626,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
@Deprecated
public void setSubscription(Map<String, String> subscription) {
this.subscription = subscription;
Map<String, String> subscriptionWithNamespace = new HashMap<String, String>();
for (String topic : subscription.keySet()) {
subscriptionWithNamespace.put(withNamespace(topic), subscription.get(topic));
}
this.subscription = subscriptionWithNamespace;
}
/**
......@@ -593,6 +650,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
......@@ -615,12 +673,13 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
}
@Override
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
}
/**
......@@ -630,6 +689,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
......@@ -690,7 +750,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
/**
......@@ -702,7 +762,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
@Override
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), fullClassName, filterClassSource);
}
/**
......@@ -715,7 +775,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
@Override
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector);
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), messageSelector);
}
/**
......
......@@ -29,6 +29,7 @@ public class ConsumeMessageContext {
private String status;
private Object mqTraceContext;
private Map<String, String> props;
private String namespace;
public String getConsumerGroup() {
return consumerGroup;
......@@ -85,4 +86,12 @@ public class ConsumeMessageContext {
public void setStatus(String status) {
this.status = status;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
......@@ -37,6 +37,7 @@ public class SendMessageContext {
private Map<String, String> props;
private DefaultMQProducerImpl producer;
private MessageType msgType = MessageType.Normal_Msg;
private String namespace;
public MessageType getMsgType() {
return msgType;
......@@ -133,4 +134,12 @@ public class SendMessageContext {
public void setBornHost(String bornHost) {
this.bornHost = bornHost;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
......@@ -16,19 +16,21 @@
*/
package org.apache.rocketmq.client.impl;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
......@@ -41,6 +43,7 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestH
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
......@@ -91,6 +94,10 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
messageExt.setTopic(NamespaceUtil
.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
}
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -35,6 +36,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -136,7 +138,7 @@ public class MQAdminImpl {
if (topicRouteData != null) {
TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
if (topicPublishInfo != null && topicPublishInfo.ok()) {
return topicPublishInfo.getMessageQueueList();
return parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
}
}
} catch (Exception e) {
......@@ -146,6 +148,16 @@ public class MQAdminImpl {
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
public List<MessageQueue> parsePublishMessageQueues(List<MessageQueue> messageQueueList) {
List<MessageQueue> resultQueues = new ArrayList<MessageQueue>();
for (MessageQueue queue : messageQueueList) {
String userTopic = NamespaceUtil.withoutNamespace(queue.getTopic(), this.mQClientFactory.getClientConfig().getNamespace());
resultQueues.add(new MessageQueue(userTopic, queue.getBrokerName(), queue.getQueueId()));
}
return resultQueues;
}
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
......@@ -407,6 +419,13 @@ public class MQAdminImpl {
}
}
//If namespace not null , reset Topic without namespace.
for (MessageExt messageExt : messageList) {
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mQClientFactory.getClientConfig().getNamespace()));
}
}
if (!messageList.isEmpty()) {
return new QueryResult(indexLastUpdateTimestamp, messageList);
} else {
......
......@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
......@@ -56,6 +58,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
......@@ -522,7 +525,13 @@ public class MQClientAPIImpl {
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) {
......@@ -665,6 +674,10 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: {
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
//If namespace not null , reset Topic without namespace.
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.clientConfig.getNamespace()));
}
return messageExt;
}
default:
......
......@@ -29,6 +29,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
......@@ -39,13 +40,12 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
......@@ -157,7 +157,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq);
this.resetRetryTopic(msgs);
this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
final long beginTime = System.currentTimeMillis();
......@@ -236,15 +236,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
}
public void resetRetryTopic(final List<MessageExt> msgs) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
msg.setTopic(retryTopic);
}
}
}
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
......@@ -326,6 +317,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();
// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
......@@ -392,10 +385,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
......@@ -408,7 +403,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
......
......@@ -26,6 +26,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
......@@ -37,6 +39,7 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -142,6 +145,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
final long beginTime = System.currentTimeMillis();
log.info("consumeMessageDirectly receive new message: {}", msg);
......@@ -380,6 +385,14 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
return false;
}
public void resetNamespace(final List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
}
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
......@@ -439,6 +452,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
......@@ -449,6 +463,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
......
......@@ -50,6 +50,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -125,7 +126,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
}
}
return mqResult;
return parseSubscribeMessageQueues(mqResult);
}
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
......@@ -135,7 +136,23 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
// check if has info in memory, otherwise invoke api.
Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
if (null == result) {
result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
}
return parseSubscribeMessageQueues(result);
}
public Set<MessageQueue> parseSubscribeMessageQueues(Set<MessageQueue> queueSet) {
Set<MessageQueue> resultQueues = new HashSet<MessageQueue>();
for (MessageQueue messageQueue : queueSet) {
String userTopic = NamespaceUtil.withoutNamespace(messageQueue.getTopic(),
this.defaultMQPullConsumer.getNamespace());
resultQueues.add(new MessageQueue(userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId()));
}
return resultQueues;
}
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
......@@ -244,9 +261,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
//If namespace not null , reset Topic without namespace.
this.resetTopic(pullResult.getMsgFoundList());
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
......@@ -259,6 +279,20 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return pullResult;
}
public void resetTopic(List<MessageExt> msgList) {
if (null == msgList || msgList.size() == 0) {
return;
}
//If namespace not null , reset Topic without namespace.
for (MessageExt messageExt : msgList) {
if (null != this.getDefaultMQPullConsumer().getNamespace()) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.defaultMQPullConsumer.getNamespace()));
}
}
}
public void subscriptionAutomatically(final String topic) {
if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
try {
......@@ -474,8 +508,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
@Override
public void onSuccess(PullResult pullResult) {
pullCallback
.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
PullResult userPullResult = DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
resetTopic(userPullResult.getMsgFoundList());
pullCallback.onSuccess(userPullResult);
}
@Override
......@@ -558,6 +593,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPullConsumer.getNamespace()));
}
}
......
......@@ -27,6 +27,8 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
......@@ -56,6 +58,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -169,7 +172,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
throw new MQClientException("The topic[" + topic + "] not exist", null);
}
return result;
return parseSubscribeMessageQueues(result);
}
public Set<MessageQueue> parseSubscribeMessageQueues(Set<MessageQueue> messageQueueList) {
Set<MessageQueue> resultQueues = new HashSet<MessageQueue>();
for (MessageQueue queue : messageQueueList) {
String userTopic = NamespaceUtil.withoutNamespace(queue.getTopic(), this.defaultMQPushConsumer.getNamespace());
resultQueues.add(new MessageQueue(userTopic, queue.getBrokerName(), queue.getQueueId()));
}
return resultQueues;
}
public DefaultMQPushConsumer getDefaultMQPushConsumer() {
......@@ -517,6 +530,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
......@@ -1131,6 +1146,20 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return queueTimeSpan;
}
public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
msg.setTopic(retryTopic);
}
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
}
public ConsumeMessageService getConsumeMessageService() {
return consumeMessageService;
}
......
......@@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
......@@ -63,6 +65,7 @@ import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
......@@ -362,6 +365,26 @@ public class MQClientInstance {
}
}
/**
*
* @param offsetTable
* @param namespace
* @return newOffsetTable
*/
public Map<MessageQueue, Long> parseOffsetTableFromBroker(Map<MessageQueue, Long> offsetTable, String namespace) {
HashMap<MessageQueue, Long> newOffsetTable = new HashMap<MessageQueue, Long>();
if (StringUtils.isNotEmpty(namespace)) {
for (Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
MessageQueue queue = entry.getKey();
queue.setTopic(NamespaceUtil.withoutNamespace(queue.getTopic(), namespace));
newOffsetTable.put(queue, entry.getValue());
}
} else {
newOffsetTable.putAll(offsetTable);
}
return newOffsetTable;
}
/**
* Remove offline broker
*/
......@@ -1220,4 +1243,8 @@ public class MQClientInstance {
public NettyClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
public ClientConfig getClientConfig() {
return clientConfig;
}
}
......@@ -73,6 +73,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
......@@ -543,6 +544,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
......@@ -699,6 +704,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
......@@ -732,6 +743,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
......@@ -774,13 +786,24 @@ public class DefaultMQProducerImpl implements MQProducerInner {
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
......@@ -846,6 +869,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
......@@ -1059,7 +1083,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
......@@ -1323,4 +1353,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable);
}
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
}
}
......@@ -134,22 +134,29 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Default constructor.
*/
public DefaultMQProducer() {
this(MixAll.DEFAULT_PRODUCER_GROUP, null);
this(null, MixAll.DEFAULT_PRODUCER_GROUP, null);
}
/**
* Constructor specifying both producer group and RPC hook.
* Constructor specifying the RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
public DefaultMQProducer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
}
/**
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}
/**
* Constructor specifying producer group, RPC hook, enabled msgTrace flag and customized trace topic name.
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
......@@ -178,10 +185,34 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying producer group.
*
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
public DefaultMQProducer(final String namespace, final String producerGroup) {
this(namespace, producerGroup, null);
}
/**
* Constructor specifying both producer group and RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this(null, producerGroup, rpcHook);
}
/**
* Constructor specifying namespace, producer group and RPC hook.
*
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
/**
......@@ -191,7 +222,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
this(producerGroup, null, enableMsgTrace, null);
this(null, producerGroup, null, enableMsgTrace, null);
}
/**
......@@ -203,16 +234,34 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(producerGroup, null, enableMsgTrace, customizedTraceTopic);
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying the RPC hook.
* Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQProducer(RPCHook rpcHook) {
this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
......@@ -229,6 +278,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
......@@ -259,7 +309,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
return this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic));
}
/**
......@@ -281,6 +331,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
......@@ -299,6 +350,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, timeout);
}
......@@ -322,6 +374,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback);
}
......@@ -338,6 +391,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
......@@ -352,6 +406,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneway(msg);
}
......@@ -370,7 +425,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
}
/**
......@@ -389,7 +445,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq, timeout);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), timeout);
}
/**
......@@ -405,7 +462,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback);
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback);
}
/**
......@@ -422,7 +480,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback, timeout);
}
/**
......@@ -437,7 +496,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, mq);
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneway(msg, queueWithNamespace(mq));
}
/**
......@@ -456,6 +516,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
......@@ -476,6 +537,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
}
......@@ -493,6 +555,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
}
......@@ -511,6 +574,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
......@@ -527,6 +591,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
}
......@@ -571,7 +636,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
/**
......@@ -587,7 +652,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
/**
......@@ -600,7 +665,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
return this.defaultMQProducerImpl.searchOffset(queueWithNamespace(mq), timestamp);
}
/**
......@@ -615,7 +680,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.maxOffset(mq);
return this.defaultMQProducerImpl.maxOffset(queueWithNamespace(mq));
}
/**
......@@ -630,7 +695,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.minOffset(mq);
return this.defaultMQProducerImpl.minOffset(queueWithNamespace(mq));
}
/**
......@@ -645,7 +710,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
return this.defaultMQProducerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
}
/**
......@@ -685,7 +750,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
return this.defaultMQProducerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
}
/**
......@@ -710,7 +775,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return this.viewMessage(msgId);
} catch (Exception e) {
}
return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
return this.defaultMQProducerImpl.queryMessageByUniqKey(withNamespace(topic), msgId);
}
@Override
......@@ -764,11 +829,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
return msgBatch;
}
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.producer;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.RPCHook;
public class TransactionMQProducer extends DefaultMQProducer {
......@@ -35,11 +36,19 @@ public class TransactionMQProducer extends DefaultMQProducer {
}
public TransactionMQProducer(final String producerGroup) {
super(producerGroup);
this(null, producerGroup, null);
}
public TransactionMQProducer(final String namespace, final String producerGroup) {
this(namespace, producerGroup, null);
}
public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
super(producerGroup, rpcHook);
this(null, producerGroup, rpcHook);
}
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
super(namespace, producerGroup, rpcHook);
}
@Override
......@@ -66,6 +75,7 @@ public class TransactionMQProducer extends DefaultMQProducer {
throw new MQClientException("localTransactionBranchCheckListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
}
......
......@@ -37,5 +37,9 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
......@@ -161,6 +161,10 @@ public class Message implements Serializable {
this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
}
public void setInstanceId(String instanceId) {
this.putProperty(MessageConst.PROPERTY_INSTANCE_ID, instanceId);
}
public int getFlag() {
return flag;
}
......
......@@ -44,6 +44,7 @@ public class MessageConst {
public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";
public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
public static final String KEY_SEPARATOR = " ";
......@@ -72,5 +73,6 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
public class NamespaceUtil {
public static final char NAMESPACE_SEPARATOR = '%';
public static final String STRING_BLANK = "";
public static final int RETRY_PREFIX_LENGTH = MixAll.RETRY_GROUP_TOPIC_PREFIX.length();
public static final int DLQ_PREFIX_LENGTH = MixAll.DLQ_GROUP_TOPIC_PREFIX.length();
/**
* Unpack namespace from resource, just like:
* (1) MQ_INST_XX%Topic_XXX --> Topic_XXX
* (2) %RETRY%MQ_INST_XX%GID_XXX --> %RETRY%GID_XXX
*
* @param resourceWithNamespace, topic/groupId with namespace.
* @return topic/groupId without namespace.
*/
public static String withoutNamespace(String resourceWithNamespace) {
if (StringUtils.isEmpty(resourceWithNamespace) || isSystemResource(resourceWithNamespace)) {
return resourceWithNamespace;
}
StringBuffer strBuffer = new StringBuffer();
if (isRetryTopic(resourceWithNamespace)) {
strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
}
if (isDLQTopic(resourceWithNamespace)) {
strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
}
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithNamespace);
int index = resourceWithoutRetryAndDLQ.indexOf(NAMESPACE_SEPARATOR);
if (index > 0) {
String resourceWithoutNamespace = resourceWithoutRetryAndDLQ.substring(index + 1);
return strBuffer.append(resourceWithoutNamespace).toString();
}
return resourceWithNamespace;
}
/**
* If resource contains the namespace, unpack namespace from resource, just like:
* (1) (MQ_INST_XX1%Topic_XXX1, MQ_INST_XX1) --> Topic_XXX1
* (2) (MQ_INST_XX2%Topic_XXX2, NULL) --> MQ_INST_XX2%Topic_XXX2
* (3) (%RETRY%MQ_INST_XX1%GID_XXX1, MQ_INST_XX1) --> %RETRY%GID_XXX1
* (4) (%RETRY%MQ_INST_XX2%GID_XXX2, MQ_INST_XX3) --> %RETRY%MQ_INST_XX2%GID_XXX2
*
* @param resourceWithNamespace, topic/groupId with namespace.
* @param namespace, namespace to be unpacked.
* @return topic/groupId without namespace.
*/
public static String withoutNamespace(String resourceWithNamespace, String namespace) {
if (StringUtils.isEmpty(resourceWithNamespace) || StringUtils.isEmpty(namespace)) {
return resourceWithNamespace;
}
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithNamespace);
if (resourceWithoutRetryAndDLQ.startsWith(namespace + NAMESPACE_SEPARATOR)) {
return withoutNamespace(resourceWithNamespace);
}
return resourceWithNamespace;
}
public static String wrapNamespace(String namespace, String resourceWithOutNamespace) {
if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {
return resourceWithOutNamespace;
}
if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {
return resourceWithOutNamespace;
}
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);
StringBuffer strBuffer = new StringBuffer();
if (isRetryTopic(resourceWithOutNamespace)) {
strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
}
if (isDLQTopic(resourceWithOutNamespace)) {
strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
}
return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();
}
public static boolean isAlreadyWithNamespace(String resource, String namespace) {
if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resource) || isSystemResource(resource)) {
return false;
}
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resource);
return resourceWithoutRetryAndDLQ.startsWith(namespace + NAMESPACE_SEPARATOR);
}
public static String wrapNamespaceAndRetry(String namespace, String consumerGroup) {
if (StringUtils.isEmpty(consumerGroup)) {
return null;
}
return new StringBuffer()
.append(MixAll.RETRY_GROUP_TOPIC_PREFIX)
.append(wrapNamespace(namespace, consumerGroup))
.toString();
}
public static String getNamespaceFromResource(String resource) {
if (StringUtils.isEmpty(resource) || isSystemResource(resource)) {
return STRING_BLANK;
}
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resource);
int index = resourceWithoutRetryAndDLQ.indexOf(NAMESPACE_SEPARATOR);
return index > 0 ? resourceWithoutRetryAndDLQ.substring(0, index) : STRING_BLANK;
}
private static String withOutRetryAndDLQ(String originalResource) {
if (StringUtils.isEmpty(originalResource)) {
return STRING_BLANK;
}
if (isRetryTopic(originalResource)) {
return originalResource.substring(RETRY_PREFIX_LENGTH);
}
if (isDLQTopic(originalResource)) {
return originalResource.substring(DLQ_PREFIX_LENGTH);
}
return originalResource;
}
private static boolean isSystemResource(String resource) {
if (StringUtils.isEmpty(resource)) {
return false;
}
if (MixAll.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) {
return true;
}
return MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC.equals(resource);
}
public static boolean isRetryTopic(String resource) {
return StringUtils.isNotBlank(resource) && resource.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
}
public static boolean isDLQTopic(String resource) {
return StringUtils.isNotBlank(resource) && resource.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX);
}
}
\ No newline at end of file
/**
* Copyright (C) 2010-2016 Alibaba Group Holding Limited
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.MixAll;
import org.junit.Assert;
import org.junit.Test;
/**
* @author MQDevelopers
*/
public class NamespaceUtilTest {
private static final String INSTANCE_ID = "MQ_INST_XXX";
private static final String INSTANCE_ID_WRONG = "MQ_INST_XXX1";
private static final String TOPIC = "TOPIC_XXX";
private static final String GROUP_ID = "GID_XXX";
private static final String SYSTEM_TOPIC = "rmq_sys_topic";
private static final String GROUP_ID_WITH_NAMESPACE = INSTANCE_ID + NamespaceUtil.NAMESPACE_SEPARATOR + GROUP_ID;
private static final String TOPIC_WITH_NAMESPACE = INSTANCE_ID + NamespaceUtil.NAMESPACE_SEPARATOR + TOPIC;
private static final String RETRY_TOPIC = MixAll.RETRY_GROUP_TOPIC_PREFIX + GROUP_ID;
private static final String RETRY_TOPIC_WITH_NAMESPACE =
MixAll.RETRY_GROUP_TOPIC_PREFIX + INSTANCE_ID + NamespaceUtil.NAMESPACE_SEPARATOR + GROUP_ID;
private static final String DLQ_TOPIC = MixAll.DLQ_GROUP_TOPIC_PREFIX + GROUP_ID;
private static final String DLQ_TOPIC_WITH_NAMESPACE =
MixAll.DLQ_GROUP_TOPIC_PREFIX + INSTANCE_ID + NamespaceUtil.NAMESPACE_SEPARATOR + GROUP_ID;
@Test
public void testWithoutNamespace() {
String topic = NamespaceUtil.withoutNamespace(TOPIC_WITH_NAMESPACE, INSTANCE_ID);
Assert.assertEquals(topic, TOPIC);
String topic1 = NamespaceUtil.withoutNamespace(TOPIC_WITH_NAMESPACE);
Assert.assertEquals(topic1, TOPIC);
String groupId = NamespaceUtil.withoutNamespace(GROUP_ID_WITH_NAMESPACE, INSTANCE_ID);
Assert.assertEquals(groupId, GROUP_ID);
String groupId1 = NamespaceUtil.withoutNamespace(GROUP_ID_WITH_NAMESPACE);
Assert.assertEquals(groupId1, GROUP_ID);
String consumerId = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE, INSTANCE_ID);
Assert.assertEquals(consumerId, RETRY_TOPIC);
String consumerId1 = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE);
Assert.assertEquals(consumerId1, RETRY_TOPIC);
String consumerId2 = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE, INSTANCE_ID_WRONG);
Assert.assertEquals(consumerId2, RETRY_TOPIC_WITH_NAMESPACE);
Assert.assertNotEquals(consumerId2, RETRY_TOPIC);
}
@Test
public void testWrapNamespace() {
String topic1 = NamespaceUtil.wrapNamespace(INSTANCE_ID, TOPIC);
Assert.assertEquals(topic1, TOPIC_WITH_NAMESPACE);
String topicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, topic1);
Assert.assertEquals(topicWithNamespaceAgain, TOPIC_WITH_NAMESPACE);
//Wrap retry topic
String retryTopicWithNamespace = NamespaceUtil.wrapNamespace(INSTANCE_ID, RETRY_TOPIC);
Assert.assertEquals(retryTopicWithNamespace, RETRY_TOPIC_WITH_NAMESPACE);
String retryTopicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, retryTopicWithNamespace);
Assert.assertEquals(retryTopicWithNamespaceAgain, retryTopicWithNamespace);
//Wrap DLQ topic
String dlqTopicWithNamespace = NamespaceUtil.wrapNamespace(INSTANCE_ID, DLQ_TOPIC);
Assert.assertEquals(dlqTopicWithNamespace, DLQ_TOPIC_WITH_NAMESPACE);
String dlqTopicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, dlqTopicWithNamespace);
Assert.assertEquals(dlqTopicWithNamespaceAgain, dlqTopicWithNamespace);
Assert.assertEquals(dlqTopicWithNamespaceAgain, DLQ_TOPIC_WITH_NAMESPACE );
//test system topic
String systemTopic = NamespaceUtil.wrapNamespace(INSTANCE_ID, SYSTEM_TOPIC);
Assert.assertEquals(systemTopic, SYSTEM_TOPIC);
}
@Test
public void testGetNamespaceFromResource(){
String namespaceExpectBlank = NamespaceUtil.getNamespaceFromResource(TOPIC);
Assert.assertEquals(namespaceExpectBlank, NamespaceUtil.STRING_BLANK);
String namespace = NamespaceUtil.getNamespaceFromResource(TOPIC_WITH_NAMESPACE);
Assert.assertEquals(namespace, INSTANCE_ID);
String namespaceFromRetryTopic = NamespaceUtil.getNamespaceFromResource(RETRY_TOPIC_WITH_NAMESPACE);
Assert.assertEquals(namespaceFromRetryTopic, INSTANCE_ID);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.namespace;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerWithNamespace {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("InstanceTest", "pidTest");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message message = new Message("topicTest", "tagTest", "Hello world".getBytes());
try {
SendResult result = producer.send(message);
System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.namespace;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageQueue;
public class PullConsumerWithNamespace {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws Exception {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("InstanceTest", "cidTest");
pullConsumer.setNamesrvAddr("127.0.0.1:9876");
pullConsumer.start();
Set<MessageQueue> mqs = pullConsumer.fetchSubscribeMessageQueues("topicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the topic: %s, queue: %s%n", mq.getTopic(), mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
pullConsumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
dealWithPullResult(pullResult);
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
pullConsumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null) {
return offset;
}
return 0;
}
private static void dealWithPullResult(PullResult pullResult) {
if (null == pullResult || pullResult.getMsgFoundList().isEmpty()) {
return;
}
pullResult.getMsgFoundList().stream().forEach(
(msg) -> System.out.printf("Topic is:%s, msgId is:%s%n" , msg.getTopic(), msg.getMsgId()));
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.namespace;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
public class PushConsumerWithNamespace {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("InstanceTest", "cidTest");
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
defaultMQPushConsumer.subscribe("topicTest", "*");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
msgs.stream().forEach((msg) -> {
System.out.printf("Msg topic is:%s, MsgId is:%s, reconsumeTimes is:%s%n", msg.getTopic() , msg.getMsgId(), msg.getReconsumeTimes());
});
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
});
defaultMQPushConsumer.start();
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册