提交 5e33f313 编写于 作者: H huzongtang

Merge branch 'master' of https://github.com/apache/rocketmq

dist: trusty
......@@ -10,7 +10,7 @@ Nor is code the only way to contribute to the project. We strongly value documen
To submit a change for inclusion, please do the following:
#### If the change is non-trivial please include some unit tests that cover the new functionality.
#### If you are introducing a completely new feature or API it is a good idea to start a wiki and get consensus on the basic design first.
#### If you are introducing a completely new feature or API it is a good idea to start a [RIP](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) and get consensus on the basic design first.
#### It is our job to follow up on patches in a timely fashion. Nag us if we aren't doing our job (sometimes we drop things).
## Becoming a Committer
......@@ -19,9 +19,8 @@ We are always interested in adding new contributors. What we look for are series
Nowadays,we have several important contribution points:
#### Wiki & JavaDoc
#### RocketMQ Console
#### RocketMQ SDK(C++\.Net\Php\Python\Go\Node.js)
#### RocketMQ MySQL(Oracle\PostgreSQL\Redis\MongoDB\HBase\MSSQL) Replicator
#### RocketMQ Connectors
##### Prerequisite
If you want to contribute the above listing points, you must abide our some prerequisites:
......@@ -51,3 +51,23 @@ We always welcome new contributions, whether for trivial cleanups, [big new feat
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
## Export Control Notice
This distribution includes cryptographic software. The country in which you currently reside may have
restrictions on the import, possession, use, and/or re-export to another country, of encryption software.
BEFORE using any encryption software, please check your country's laws, regulations and policies concerning
the import, possession, or use, and re-export of encryption software, to see if this is permitted. See
<http://www.wassenaar.org/> for more information.
The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this
software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software
using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache
Software Foundation distribution makes it eligible for export under the License Exception ENC Technology
Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for
both object code and source code.
The following provides more details on the included cryptographic software:
This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to
support authentication, and encryption and decryption of data sent across the network between
......@@ -13,7 +13,7 @@
<name>rocketmq-acl ${project.version}</name>
......@@ -16,12 +16,18 @@
package org.apache.rocketmq.acl.common;
import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.yaml.snakeyaml.Yaml;
......@@ -29,6 +35,8 @@ import static org.apache.rocketmq.acl.common.SessionCredentials.CHARSET;
public class AclUtils {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static byte[] combineRequestContent(RemotingCommand request, SortedMap<String, String> fieldsMap) {
try {
StringBuilder sb = new StringBuilder("");
......@@ -124,17 +132,44 @@ public class AclUtils {
try {
fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz);
} catch (FileNotFoundException ignore) {
return null;
} catch (Exception e) {
throw new AclException(String.format("The file for Plain mode was not found , paths %s", path), e);
throw new AclException(e.getMessage());
} finally {
if (fis != null) {
try {
} catch (IOException e) {
throw new AclException("close transport fileInputStream Exception", e);
} catch (IOException ignore) {
public static RPCHook getAclRPCHook(String fileName) {
JSONObject yamlDataObject = null;
try {
yamlDataObject = AclUtils.getYamlDataObject(fileName,
} catch (Exception e) {
log.error("convert yaml file to data object error, ",e);
return null;
if (yamlDataObject == null || yamlDataObject.isEmpty()) {
log.warn("Cannot find conf file :{}, acl isn't be enabled." ,fileName);
return null;
String accessKey = yamlDataObject.getString("accessKey");
String secretKey = yamlDataObject.getString("secretKey");
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
log.warn("AccessKey or secretKey is blank, the acl is not enabled.");
return null;
return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
......@@ -53,11 +53,13 @@ public class PlainAccessValidator implements AccessValidator {
if (request.getExtFields() == null) {
throw new AclException("request's extFields value is null");
//If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern)
//The following logic codes depend on the request's extFields not to be null.
return accessResource;
......@@ -116,6 +118,7 @@ public class PlainAccessValidator implements AccessValidator {
} catch (Throwable t) {
throw new AclException(t.getMessage(), t);
// Content
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
......@@ -16,10 +16,12 @@
package org.apache.rocketmq.acl.common;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Assert;
import org.junit.Test;
......@@ -133,9 +135,34 @@ public class AclUtilsTest {
public void getYamlDataIgnoreFileNotFoundExceptionTest() {
JSONObject yamlDataObject = AclUtils.getYamlDataObject("plain_acl.yml", JSONObject.class);
Assert.assertTrue(yamlDataObject == null);
@Test(expected = Exception.class)
public void getYamlDataObjectExceptionTest() {
public void getYamlDataExceptionTest() {
AclUtils.getYamlDataObject("plain_acl.yml", Map.class);
AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl_format_error.yml", Map.class);
public void getAclRPCHookTest() {
RPCHook errorContRPCHook = AclUtils.getAclRPCHook("src/test/resources/conf/plain_acl_format_error.yml");
RPCHook noFileRPCHook = AclUtils.getAclRPCHook("src/test/resources/plain_acl_format_error1.yml");
RPCHook emptyContRPCHook = AclUtils.getAclRPCHook("src/test/resources/conf/plain_acl_null.yml");
RPCHook incompleteContRPCHook = AclUtils.getAclRPCHook("src/test/resources/conf/plain_acl_incomplete.yml");
......@@ -116,7 +116,7 @@ public class PlainAccessValidatorTest {
@Test(expected = AclException.class)
public void validateForAdminCommandWithOutAclRPCHook() {
RemotingCommand consumerOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
plainAccessValidator.parse(consumerOffsetAdminRequest, "");
......@@ -284,4 +284,17 @@ public class PlainAccessValidatorTest {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "");
public void validateGetAllTopicConfigTest() {
String whiteRemoteAddress = "";
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
ByteBuffer buf = remotingCommand.encodeHeader();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress);
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
date 2015-02-01
- name: Jai
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
- accessKey: rocketmq2
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
\ No newline at end of file
......@@ -13,7 +13,7 @@
......@@ -35,6 +35,7 @@ public class ConsumeMessageContext {
private BrokerStatsManager.StatsType commercialRcvStats;
private int commercialRcvTimes;
private int commercialRcvSize;
private String namespace;
public String getConsumerGroup() {
return consumerGroup;
......@@ -147,4 +148,12 @@ public class ConsumeMessageContext {
public void setCommercialRcvSize(final int commercialRcvSize) {
this.commercialRcvSize = commercialRcvSize;
public String getNamespace() {
return namespace;
public void setNamespace(String namespace) {
this.namespace = namespace;
......@@ -40,11 +40,12 @@ public class SendMessageContext {
private long bornTimeStamp;
private MessageType msgType = MessageType.Trans_msg_Commit;
private boolean isSuccess = false;
//For Commercial
private String commercialOwner;
private BrokerStatsManager.StatsType commercialSendStats;
private int commercialSendSize;
private int commercialSendTimes;
private String namespace;
public boolean isSuccess() {
return isSuccess;
......@@ -229,4 +230,12 @@ public class SendMessageContext {
public void setCommercialSendTimes(final int commercialSendTimes) {
this.commercialSendTimes = commercialSendTimes;
public String getNamespace() {
return namespace;
public void setNamespace(String namespace) {
this.namespace = namespace;
......@@ -16,6 +16,12 @@
package org.apache.rocketmq.broker.processor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
......@@ -27,11 +33,10 @@ import org.apache.rocketmq.common.constant.DBMsgConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
......@@ -40,18 +45,14 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.utils.ChannelUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......@@ -73,9 +74,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
if (!this.hasSendMessageHook()) {
return null;
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
SendMessageContext mqtraceContext;
mqtraceContext = new SendMessageContext();
......@@ -253,7 +256,9 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
try {
final SendMessageRequestHeader requestHeader = parseRequestHeader(request);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
if (null != requestHeader) {
......@@ -16,6 +16,10 @@
package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
......@@ -33,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
......@@ -49,10 +54,6 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
......@@ -101,9 +102,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
final ConsumerSendMsgBackRequestHeader requestHeader =
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
ConsumeMessageContext context = new ConsumeMessageContext();
......@@ -19,7 +19,7 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.client;
* Used for set access channel, if need migrate the rocketmq service to cloud, it is We recommend set the value with
* "CLOUD". otherwise set with "LOCAL", especially used the message trace feature.
public enum AccessChannel {
* Means connect to private IDC cluster.
* Means connect to Cloud service.
......@@ -16,8 +16,13 @@
package org.apache.rocketmq.client;
import org.apache.rocketmq.common.MixAll;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.utils.NameServerAddressUtils;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
......@@ -27,10 +32,13 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class ClientConfig {
public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
private String clientIP = RemotingUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace;
protected AccessChannel accessChannel = AccessChannel.LOCAL;
* Pulling topic information interval from the named server
......@@ -45,7 +53,7 @@ public class ClientConfig {
private int persistConsumerOffsetInterval = 1000 * 5;
private boolean unitMode = false;
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
private boolean useTLS = TlsSystemConfig.tlsEnable;
......@@ -87,6 +95,38 @@ public class ClientConfig {
public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
public Set<String> withNamespace(Set<String> resourceSet) {
Set<String> resourceWithNamespace = new HashSet<String>();
for (String resource : resourceSet) {
return resourceWithNamespace;
public String withoutNamespace(String resource) {
return NamespaceUtil.withoutNamespace(resource, this.getNamespace());
public Set<String> withoutNamespace(Set<String> resourceSet) {
Set<String> resourceWithoutNamespace = new HashSet<String>();
for (String resource : resourceSet) {
return resourceWithoutNamespace;
public MessageQueue queueWithNamespace(MessageQueue queue) {
if (StringUtils.isEmpty(this.getNamespace())) {
return queue;
return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
public void resetClientConfig(final ClientConfig cc) {
this.namesrvAddr = cc.namesrvAddr;
this.clientIP = cc.clientIP;
......@@ -99,6 +139,7 @@ public class ClientConfig {
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
this.useTLS = cc.useTLS;
this.namespace = cc.namespace;
this.language = cc.language;
......@@ -115,14 +156,22 @@ public class ClientConfig {
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
cc.useTLS = useTLS;
cc.namespace = namespace;
cc.language = language;
return cc;
public String getNamesrvAddr() {
if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.NAMESRV_ENDPOINT_PATTERN.matcher(namesrvAddr.trim()).matches()) {
return namesrvAddr.substring(NameServerAddressUtils.ENDPOINT_PREFIX.length());
return namesrvAddr;
* Domain name mode access way does not support the delimiter(;), and only one domain name can be set.
* @param namesrvAddr name server address
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
......@@ -199,12 +248,37 @@ public class ClientConfig {
this.language = language;
public String getNamespace() {
if (StringUtils.isNotEmpty(namespace)) {
return namespace;
if (StringUtils.isNotEmpty(this.namesrvAddr)) {
if (NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr)) {
return NameServerAddressUtils.parseInstanceIdFromEndpoint(namesrvAddr);
return namespace;
public void setNamespace(String namespace) {
this.namespace = namespace;
public AccessChannel getAccessChannel() {
return this.accessChannel;
public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -40,18 +41,16 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
* Do the same thing for the same Group, the application must be set,and
* guarantee Globally unique
* Do the same thing for the same Group, the application must be set,and guarantee Globally unique
private String consumerGroup;
* Long polling mode, the Consumer connection max suspend time, it is not
* recommended to modify
* Long polling mode, the Consumer connection max suspend time, it is not recommended to modify
private long brokerSuspendMaxTimeMillis = 1000 * 20;
* Long polling mode, the Consumer connection timeout(must greater than
* brokerSuspendMaxTimeMillis), it is not recommended to modify
* Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
* recommended to modify
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
......@@ -86,62 +85,108 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
private int maxReconsumeTimes = 16;
public DefaultMQPullConsumer() {
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
public DefaultMQPullConsumer(final String consumerGroup) {
this(consumerGroup, null);
this(null, consumerGroup, null);
public DefaultMQPullConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
this(null, consumerGroup, rpcHook);
public DefaultMQPullConsumer(final String namespace, final String consumerGroup) {
this(namespace, consumerGroup, null);
* Constructor specifying namespace, consumer group and RPC hook.
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
createTopic(key, withNamespace(newTopic), queueNum, 0);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
return this.defaultMQPullConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.maxOffset(mq);
return this.defaultMQPullConsumerImpl.maxOffset(queueWithNamespace(mq));
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.minOffset(mq);
return this.defaultMQPullConsumerImpl.minOffset(queueWithNamespace(mq));
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq);
return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
return this.defaultMQPullConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
......@@ -156,6 +201,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
return brokerSuspendMaxTimeMillis;
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) {
this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
......@@ -205,28 +254,41 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
public void setRegisterTopics(Set<String> registerTopics) {
this.registerTopics = registerTopics;
this.registerTopics = withNamespace(registerTopics);
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic);
return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
public void start() throws MQClientException {
this.setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
......@@ -238,7 +300,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
public void registerMessageQueueListener(String topic, MessageQueueListener listener) {
synchronized (this.registerTopics) {
if (listener != null) {
this.messageQueueListener = listener;
......@@ -248,80 +310,80 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums);
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums);
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout);
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums);
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout);
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, timeout);
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback);
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout);
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, pullCallback);
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout);
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), messageSelector, offset, maxNums, pullCallback, timeout);
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums);
return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums);
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback);
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback);
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset);
this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, fromStore);
return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore);
public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException {
return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic);
return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(withNamespace(topic));
......@@ -333,23 +395,36 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
} catch (Exception e) {
// Ignore
return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, uniqKey);
return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(withNamespace(topic), uniqKey);
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public OffsetStore getOffsetStore() {
return offsetStore;
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
return defaultMQPullConsumerImpl;
......@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -160,7 +161,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Max consumer thread number
private int consumeThreadMax = 64;
private int consumeThreadMax = 20;
* Threshold for dynamic adjustment of the number of thread pool
......@@ -262,7 +263,47 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Default constructor.
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
* Constructor specifying consumer group.
* @param consumerGroup Consumer group.
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
* Constructor specifying namespace and consumer group.
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consumer group.
public DefaultMQPushConsumer(final String namespace, final String consumerGroup) {
this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely());
* Constructor specifying RPC hook.
* @param rpcHook RPC hook to execute before each remoting command.
public DefaultMQPushConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
* Constructor specifying namespace, consumer group and RPC hook .
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this(namespace, consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
......@@ -274,48 +315,25 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy);
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
* Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm.
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
traceDispatcher = dispatcher;
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
* Constructor specifying RPC hook.
* @param rpcHook RPC hook to execute before each remoting command.
public DefaultMQPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
* Constructor specifying consumer group and enabled msg trace flag.
......@@ -323,7 +341,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param enableMsgTrace Switch flag instance for message trace.
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
......@@ -334,60 +352,131 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
* Constructor specifying consumer group.
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
* @param consumerGroup Consumer group.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy, enableMsgTrace, customizedTraceTopic);
* Constructor specifying namespace, consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
traceDispatcher = dispatcher;
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
createTopic(key, withNamespace(newTopic), queueNum, 0);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
return this.defaultMQPushConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(mq);
return this.defaultMQPushConsumerImpl.maxOffset(queueWithNamespace(mq));
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.minOffset(mq);
return this.defaultMQPushConsumerImpl.minOffset(queueWithNamespace(mq));
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
return this.defaultMQPushConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......@@ -397,7 +486,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
} catch (Exception e) {
// Ignore
return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId);
return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(withNamespace(topic), msgId);
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
......@@ -456,6 +545,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeThreadMin = consumeThreadMin;
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
return defaultMQPushConsumerImpl;
......@@ -528,13 +621,24 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
return subscription;
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public void setSubscription(Map<String, String> subscription) {
this.subscription = subscription;
Map<String, String> subscriptionWithNamespace = new HashMap<String, String>();
for (String topic : subscription.keySet()) {
subscriptionWithNamespace.put(withNamespace(topic), subscription.get(topic));
this.subscription = subscriptionWithNamespace;
* Send message back to broker which will be re-delivered in future.
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
* @param msg Message to send back.
* @param delayLevel delay level.
* @throws RemotingException if there is any network-tier error.
......@@ -542,9 +646,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
......@@ -552,6 +658,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Send message back to the broker whose name is <code>brokerName</code> and the message will be re-delivered in
* future.
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
* @param msg Message to send back.
* @param delayLevel delay level.
* @param brokerName broker name.
......@@ -560,15 +669,17 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
......@@ -578,10 +689,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
......@@ -638,7 +750,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
......@@ -650,7 +762,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), fullClassName, filterClassSource);
......@@ -663,7 +775,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector);
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), messageSelector);
......@@ -702,10 +814,18 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public OffsetStore getOffsetStore() {
return offsetStore;
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
......@@ -29,6 +29,7 @@ public class ConsumeMessageContext {
private String status;
private Object mqTraceContext;
private Map<String, String> props;
private String namespace;
public String getConsumerGroup() {
return consumerGroup;
......@@ -85,4 +86,12 @@ public class ConsumeMessageContext {
public void setStatus(String status) {
this.status = status;
public String getNamespace() {
return namespace;
public void setNamespace(String namespace) {
this.namespace = namespace;
......@@ -37,6 +37,7 @@ public class SendMessageContext {
private Map<String, String> props;
private DefaultMQProducerImpl producer;
private MessageType msgType = MessageType.Normal_Msg;
private String namespace;
public MessageType getMsgType() {
return msgType;
......@@ -133,4 +134,12 @@ public class SendMessageContext {
public void setBornHost(String bornHost) {
this.bornHost = bornHost;
public String getNamespace() {
return namespace;
public void setNamespace(String namespace) {
this.namespace = namespace;
......@@ -16,19 +16,21 @@
package org.apache.rocketmq.client.impl;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
......@@ -41,6 +43,7 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestH
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
......@@ -91,6 +94,10 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -35,6 +36,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -136,7 +138,7 @@ public class MQAdminImpl {
if (topicRouteData != null) {
TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
if (topicPublishInfo != null && topicPublishInfo.ok()) {
return topicPublishInfo.getMessageQueueList();
return parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
} catch (Exception e) {
......@@ -146,6 +148,16 @@ public class MQAdminImpl {
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
public List<MessageQueue> parsePublishMessageQueues(List<MessageQueue> messageQueueList) {
List<MessageQueue> resultQueues = new ArrayList<MessageQueue>();
for (MessageQueue queue : messageQueueList) {
String userTopic = NamespaceUtil.withoutNamespace(queue.getTopic(), this.mQClientFactory.getClientConfig().getNamespace());
resultQueues.add(new MessageQueue(userTopic, queue.getBrokerName(), queue.getQueueId()));
return resultQueues;
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
......@@ -407,6 +419,13 @@ public class MQAdminImpl {
//If namespace not null , reset Topic without namespace.
for (MessageExt messageExt : messageList) {
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mQClientFactory.getClientConfig().getNamespace()));
if (!messageList.isEmpty()) {
return new QueryResult(indexLastUpdateTimestamp, messageList);
} else {
......@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
......@@ -56,6 +58,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
......@@ -522,7 +525,13 @@ public class MQClientAPIImpl {
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) {
......@@ -665,6 +674,10 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: {
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
//If namespace not null , reset Topic without namespace.
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.clientConfig.getNamespace()));
return messageExt;
......@@ -29,6 +29,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
......@@ -39,13 +40,12 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
......@@ -157,7 +157,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq);
this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
final long beginTime = System.currentTimeMillis();
......@@ -236,15 +236,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
public void resetRetryTopic(final List<MessageExt> msgs) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
......@@ -326,6 +317,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();
// Wrap topic with namespace before sending back message.
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
......@@ -392,10 +385,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setProps(new HashMap<String, String>());
......@@ -408,7 +403,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
......@@ -26,6 +26,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
......@@ -37,6 +39,7 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -142,6 +145,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
final long beginTime = System.currentTimeMillis();
log.info("consumeMessageDirectly receive new message: {}", msg);
......@@ -380,6 +385,14 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
return false;
public void resetNamespace(final List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
......@@ -439,6 +452,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
......@@ -449,6 +463,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
consumeMessageContext = new ConsumeMessageContext();
......@@ -50,6 +50,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -125,7 +126,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return mqResult;
return parseSubscribeMessageQueues(mqResult);
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
......@@ -135,7 +136,23 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
// check if has info in memory, otherwise invoke api.
Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
if (null == result) {
result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
return parseSubscribeMessageQueues(result);
public Set<MessageQueue> parseSubscribeMessageQueues(Set<MessageQueue> queueSet) {
Set<MessageQueue> resultQueues = new HashSet<MessageQueue>();
for (MessageQueue messageQueue : queueSet) {
String userTopic = NamespaceUtil.withoutNamespace(messageQueue.getTopic(),
resultQueues.add(new MessageQueue(userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId()));
return resultQueues;
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
......@@ -244,9 +261,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
//If namespace not null , reset Topic without namespace.
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
......@@ -259,6 +279,20 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return pullResult;
public void resetTopic(List<MessageExt> msgList) {
if (null == msgList || msgList.size() == 0) {
//If namespace not null , reset Topic without namespace.
for (MessageExt messageExt : msgList) {
if (null != this.getDefaultMQPullConsumer().getNamespace()) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.defaultMQPullConsumer.getNamespace()));
public void subscriptionAutomatically(final String topic) {
if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
try {
......@@ -474,8 +508,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void onSuccess(PullResult pullResult) {
.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
PullResult userPullResult = DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
......@@ -558,6 +593,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPullConsumer.getNamespace()));
......@@ -27,6 +27,8 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
......@@ -56,6 +58,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -169,7 +172,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
throw new MQClientException("The topic[" + topic + "] not exist", null);
return result;
return parseSubscribeMessageQueues(result);
public Set<MessageQueue> parseSubscribeMessageQueues(Set<MessageQueue> messageQueueList) {
Set<MessageQueue> resultQueues = new HashSet<MessageQueue>();
for (MessageQueue queue : messageQueueList) {
String userTopic = NamespaceUtil.withoutNamespace(queue.getTopic(), this.defaultMQPushConsumer.getNamespace());
resultQueues.add(new MessageQueue(userTopic, queue.getBrokerName(), queue.getQueueId()));
return resultQueues;
public DefaultMQPushConsumer getDefaultMQPushConsumer() {
......@@ -517,6 +530,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
......@@ -1131,6 +1146,20 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return queueTimeSpan;
public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
public ConsumeMessageService getConsumeMessageService() {
return consumeMessageService;
......@@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
......@@ -63,6 +65,7 @@ import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
......@@ -362,6 +365,26 @@ public class MQClientInstance {
* @param offsetTable
* @param namespace
* @return newOffsetTable
public Map<MessageQueue, Long> parseOffsetTableFromBroker(Map<MessageQueue, Long> offsetTable, String namespace) {
HashMap<MessageQueue, Long> newOffsetTable = new HashMap<MessageQueue, Long>();
if (StringUtils.isNotEmpty(namespace)) {
for (Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
MessageQueue queue = entry.getKey();
queue.setTopic(NamespaceUtil.withoutNamespace(queue.getTopic(), namespace));
newOffsetTable.put(queue, entry.getValue());
} else {
return newOffsetTable;
* Remove offline broker
......@@ -1220,4 +1243,8 @@ public class MQClientInstance {
public NettyClientConfig getNettyClientConfig() {
return nettyClientConfig;
public ClientConfig getClientConfig() {
return clientConfig;
......@@ -73,6 +73,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
......@@ -543,6 +544,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
......@@ -699,6 +704,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
topicWithNamespace = true;
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
......@@ -732,6 +743,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
......@@ -774,13 +786,24 @@ public class DefaultMQProducerImpl implements MQProducerInner {
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
......@@ -846,6 +869,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw e;
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
......@@ -1059,7 +1083,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
List<MessageQueue> messageQueueList =
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
......@@ -1323,4 +1353,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.producer;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.RPCHook;
public class TransactionMQProducer extends DefaultMQProducer {
......@@ -35,11 +36,19 @@ public class TransactionMQProducer extends DefaultMQProducer {
public TransactionMQProducer(final String producerGroup) {
this(null, producerGroup, null);
public TransactionMQProducer(final String namespace, final String producerGroup) {
this(namespace, producerGroup, null);
public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
super(producerGroup, rpcHook);
this(null, producerGroup, rpcHook);
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
super(namespace, producerGroup, rpcHook);
......@@ -66,6 +75,7 @@ public class TransactionMQProducer extends DefaultMQProducer {
throw new MQClientException("localTransactionBranchCheckListener is null", null);
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
......@@ -16,6 +16,21 @@
package org.apache.rocketmq.client.trace;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
......@@ -34,21 +49,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.UUID;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
public class AsyncTraceDispatcher implements TraceDispatcher {
......@@ -58,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private final int batchSize;
private final int maxMsgSize;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecuter;
private final ThreadPoolExecutor traceExecutor;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
......@@ -72,9 +72,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
......@@ -87,16 +87,24 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
this.traceExecuter = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
public AccessChannel getAccessChannel() {
return accessChannel;
public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
public String getTraceTopicName() {
return traceTopicName;
......@@ -125,12 +133,13 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.hostConsumer = hostConsumer;
public void start(String nameSrvAddr) throws MQClientException {
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
......@@ -176,7 +185,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void shutdown() {
this.stopped = true;
if (isStarted.get()) {
......@@ -233,7 +242,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
......@@ -266,8 +275,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
// Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic();
String regionId = context.getRegionId();
// Use original message entity's topic as key
String key = topic;
if (!StringUtils.isBlank(regionId)) {
key = key + TraceConstants.CONTENT_SPLITOR + regionId;
List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>();
......@@ -277,14 +290,21 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
String dataTopic = entry.getKey();
String regionId = null;
if (key.length > 1) {
dataTopic = key[0];
regionId = key[1];
flushData(entry.getValue(), dataTopic, regionId);
* Batch sending data actually
private void flushData(List<TraceTransferBean> transBeanList) {
private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
if (transBeanList.size() == 0) {
......@@ -300,7 +320,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
// Ensure that the size of the package should not exceed the upper limit.
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString());
sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
// Clear temporary buffer after finishing
buffer.delete(0, buffer.length());
......@@ -308,7 +328,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString());
sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
......@@ -317,19 +337,22 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
* Send message trace data
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch
* @param data the message trace data in this batch
private void sendTraceDataByMQ(Set<String> keySet, final String data) {
String topic = traceTopicName;
final Message message = new Message(topic, data.getBytes());
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName;
if (AccessChannel.CLOUD == accessChannel) {
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message
try {
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
SendCallback callback = new SendCallback() {
public void onSuccess(SendResult sendResult) {
......@@ -16,10 +16,13 @@
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.MixAll;
public class TraceConstants {
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
......@@ -16,6 +16,7 @@
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;
......@@ -27,7 +28,7 @@ public interface TraceDispatcher {
* Initialize asynchronous transfer data module
void start(String nameSrvAddr) throws MQClientException;
void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
* Append the transfering data
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
......@@ -51,7 +52,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext traceContext = new TraceContext();
List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
......@@ -65,7 +66,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceBean traceBean = new TraceBean();
......@@ -96,7 +97,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext subAfterContext = new TraceContext();
......@@ -16,15 +16,16 @@
package org.apache.rocketmq.client.trace.hook;
import java.util.ArrayList;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceType;
import java.util.ArrayList;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class SendMessageTraceHookImpl implements SendMessageHook {
......@@ -50,10 +51,10 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
......@@ -19,7 +19,7 @@
......@@ -37,5 +37,9 @@
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
public class MQVersion {
public static final int CURRENT_VERSION = Version.V4_5_0.ordinal();
public static final int CURRENT_VERSION = Version.V4_5_1.ordinal();
public static String getVersionDesc(int value) {
int length = Version.values().length;
......@@ -93,6 +93,7 @@ public class MixAll {
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
......@@ -161,6 +161,10 @@ public class Message implements Serializable {
this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
public void setInstanceId(String instanceId) {
this.putProperty(MessageConst.PROPERTY_INSTANCE_ID, instanceId);
public int getFlag() {
return flag;
......@@ -44,6 +44,7 @@ public class MessageConst {
public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
public static final String KEY_SEPARATOR = " ";
......@@ -72,5 +73,6 @@ public class MessageConst {
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
public class NamespaceUtil {
public static final char NAMESPACE_SEPARATOR = '%';
public static final String STRING_BLANK = "";
public static final int RETRY_PREFIX_LENGTH = MixAll.RETRY_GROUP_TOPIC_PREFIX.length();
public static final int DLQ_PREFIX_LENGTH = MixAll.DLQ_GROUP_TOPIC_PREFIX.length();
* Unpack namespace from resource, just like:
* (1) MQ_INST_XX%Topic_XXX --> Topic_XXX
* @param resourceWithNamespace, topic/groupId with namespace.
* @return topic/groupId without namespace.
public static String withoutNamespace(String resourceWithNamespace) {
if (StringUtils.isEmpty(resourceWithNamespace) || isSystemResource(resourceWithNamespace)) {
return resourceWithNamespace;
StringBuffer strBuffer = new StringBuffer();
if (isRetryTopic(resourceWithNamespace)) {
if (isDLQTopic(resourceWithNamespace)) {
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithNamespace);
int index = resourceWithoutRetryAndDLQ.indexOf(NAMESPACE_SEPARATOR);
if (index > 0) {
String resourceWithoutNamespace = resourceWithoutRetryAndDLQ.substring(index + 1);
return strBuffer.append(resourceWithoutNamespace).toString();
return resourceWithNamespace;
* If resource contains the namespace, unpack namespace from resource, just like:
* (1) (MQ_INST_XX1%Topic_XXX1, MQ_INST_XX1) --> Topic_XXX1
* (2) (MQ_INST_XX2%Topic_XXX2, NULL) --> MQ_INST_XX2%Topic_XXX2
* @param resourceWithNamespace, topic/groupId with namespace.
* @param namespace, namespace to be unpacked.
* @return topic/groupId without namespace.
public static String withoutNamespace(String resourceWithNamespace, String namespace) {
if (StringUtils.isEmpty(resourceWithNamespace) || StringUtils.isEmpty(namespace)) {
return resourceWithNamespace;
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithNamespace);
if (resourceWithoutRetryAndDLQ.startsWith(namespace + NAMESPACE_SEPARATOR)) {
return withoutNamespace(resourceWithNamespace);
return resourceWithNamespace;
public static String wrapNamespace(String namespace, String resourceWithOutNamespace) {
if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {
return resourceWithOutNamespace;
if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {
return resourceWithOutNamespace;
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);
StringBuffer strBuffer = new StringBuffer();
if (isRetryTopic(resourceWithOutNamespace)) {
if (isDLQTopic(resourceWithOutNamespace)) {
return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();
public static boolean isAlreadyWithNamespace(String resource, String namespace) {
if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resource) || isSystemResource(resource)) {
return false;
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resource);
return resourceWithoutRetryAndDLQ.startsWith(namespace + NAMESPACE_SEPARATOR);
public static String wrapNamespaceAndRetry(String namespace, String consumerGroup) {
if (StringUtils.isEmpty(consumerGroup)) {
return null;
return new StringBuffer()
.append(wrapNamespace(namespace, consumerGroup))
public static String getNamespaceFromResource(String resource) {
if (StringUtils.isEmpty(resource) || isSystemResource(resource)) {
String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resource);
int index = resourceWithoutRetryAndDLQ.indexOf(NAMESPACE_SEPARATOR);
return index > 0 ? resourceWithoutRetryAndDLQ.substring(0, index) : STRING_BLANK;
private static String withOutRetryAndDLQ(String originalResource) {
if (StringUtils.isEmpty(originalResource)) {
if (isRetryTopic(originalResource)) {
return originalResource.substring(RETRY_PREFIX_LENGTH);
if (isDLQTopic(originalResource)) {
return originalResource.substring(DLQ_PREFIX_LENGTH);
return originalResource;
private static boolean isSystemResource(String resource) {
if (StringUtils.isEmpty(resource)) {
return false;
if (MixAll.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) {
return true;
return MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC.equals(resource);
public static boolean isRetryTopic(String resource) {
return StringUtils.isNotBlank(resource) && resource.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
public static boolean isDLQTopic(String resource) {
return StringUtils.isNotBlank(resource) && resource.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX);
\ No newline at end of file
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
package org.apache.rocketmq.common.utils;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
public class NameServerAddressUtils {
public static final String INSTANCE_PREFIX = "MQ_INST_";
public static final String INSTANCE_REGEX = INSTANCE_PREFIX + "\\w+_\\w+";
public static final String ENDPOINT_PREFIX = "http://";
public static final Pattern NAMESRV_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + ".*");
public static final Pattern INST_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + INSTANCE_REGEX + "\\..*");
public static String getNameServerAddresses() {
return System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
public static boolean validateInstanceEndpoint(String endpoint) {
return INST_ENDPOINT_PATTERN.matcher(endpoint).matches();
public static String parseInstanceIdFromEndpoint(String endpoint) {
if (StringUtils.isEmpty(endpoint)) {
return null;
return endpoint.substring(ENDPOINT_PREFIX.length(), endpoint.indexOf('.'));
package org.apache.rocketmq.common;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.junit.Test;
import java.io.File;
import java.io.PrintWriter;
import java.lang.reflect.Method;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ConfigManagerTest {
private static final String PATH_FILE = System.getProperty("java.io.tmpdir") + File.separator + "org.apache.rocketmq.common.ConfigManagerTest";
private static final String CONTENT_ENCODE = "Encode content for ConfigManager";
public void testLoad() throws Exception {
ConfigManager testConfigManager = buildTestConfigManager();
File file = createAndWriteFile(testConfigManager.configFilePath());
File fileBak = createAndWriteFile(testConfigManager.configFilePath() + ".bak");
public void testLoadBak() throws Exception {
ConfigManager testConfigManager = buildTestConfigManager();
File file = createAndWriteFile(testConfigManager.configFilePath() + ".bak");
// invoke private method "loadBak()"
Method declaredMethod = ConfigManager.class.getDeclaredMethod("loadBak");
Boolean loadBakResult = (Boolean) declaredMethod.invoke(testConfigManager);
Boolean loadBakResult2 = (Boolean) declaredMethod.invoke(testConfigManager);
public void testPersist() throws Exception {
ConfigManager testConfigManager = buildTestConfigManager();
File file = new File(testConfigManager.configFilePath());
assertEquals(CONTENT_ENCODE, MixAll.file2String(file));
private ConfigManager buildTestConfigManager() {
return new ConfigManager() {
public String encode() {
return encode(false);
public String configFilePath() {
return PATH_FILE;
public void decode(String jsonString) {
public String encode(boolean prettyFormat) {
private File createAndWriteFile(String fileName) throws Exception {
File file = new File(fileName);
if (file.exists()) {
PrintWriter out = new PrintWriter(fileName);
return file;
\ No newline at end of file
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
* CountDownLatch2 Unit Test
* @see CountDownLatch2
public class CountDownLatch2Test {
* test constructor with invalid init param
* @see CountDownLatch2#CountDownLatch2(int)
public void testConstructorError() {
int count = -1;
try {
CountDownLatch2 latch = new CountDownLatch2(count);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("count < 0"));
* test constructor with valid init param
* @see CountDownLatch2#CountDownLatch2(int)
public void testConstructor() {
int count = 10;
CountDownLatch2 latch = new CountDownLatch2(count);
assertEquals("Expected equal", count, latch.getCount());
assertThat("Expected contain", latch.toString(), containsString("[Count = " + count + "]"));
* test await timeout
* @see CountDownLatch2#await(long, TimeUnit)
public void testAwaitTimeout() throws InterruptedException {
int count = 1;
CountDownLatch2 latch = new CountDownLatch2(count);
boolean await = latch.await(10, TimeUnit.MILLISECONDS);
assertFalse("Expected false", await);
boolean await2 = latch.await(10, TimeUnit.MILLISECONDS);
assertTrue("Expected true", await2);
* test reset
* @see CountDownLatch2#countDown()
@Test(timeout = 1000)
public void testCountDownAndGetCount() throws InterruptedException {
int count = 2;
CountDownLatch2 latch = new CountDownLatch2(count);
assertEquals("Expected equal", count, latch.getCount());
assertEquals("Expected equal", count - 1, latch.getCount());
assertEquals("Expected equal", 0, latch.getCount());
* test reset
* @see CountDownLatch2#reset()
public void testReset() throws InterruptedException {
int count = 2;
CountDownLatch2 latch = new CountDownLatch2(count);
assertEquals("Expected equal", count - 1, latch.getCount());
assertEquals("Expected equal", count, latch.getCount());
assertEquals("Expected equal", 0, latch.getCount());
// coverage Sync#tryReleaseShared, c==0
assertEquals("Expected equal", 0, latch.getCount());
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class ServiceThreadTest {
public void testShutdown() {
shutdown(false, false);
shutdown(false, true);
shutdown(true, false);
shutdown(true, true);
public void testStop() {
public void testMakeStop() {
ServiceThread testServiceThread = startTestServiceThread();
assertEquals(true, testServiceThread.isStopped());
public void testWakeup() {
ServiceThread testServiceThread = startTestServiceThread();
assertEquals(true, testServiceThread.hasNotified.get());
assertEquals(0, testServiceThread.waitPoint.getCount());
public void testWaitForRunning() {
ServiceThread testServiceThread = startTestServiceThread();
// test waitForRunning
assertEquals(false, testServiceThread.hasNotified.get());
assertEquals(1, testServiceThread.waitPoint.getCount());
// test wake up
assertEquals(true, testServiceThread.hasNotified.get());
assertEquals(0, testServiceThread.waitPoint.getCount());
// repeat waitForRunning
assertEquals(false, testServiceThread.hasNotified.get());
assertEquals(0, testServiceThread.waitPoint.getCount());
// repeat waitForRunning again
assertEquals(false, testServiceThread.hasNotified.get());
assertEquals(1, testServiceThread.waitPoint.getCount());
private ServiceThread startTestServiceThread() {
return startTestServiceThread(false);
private ServiceThread startTestServiceThread(boolean daemon) {
ServiceThread testServiceThread = new ServiceThread() {
public void run() {
private void doNothing() {}
public String getServiceName() {
return "TestServiceThread";
// test start
assertEquals(false, testServiceThread.isStopped());
return testServiceThread;
public void shutdown(boolean daemon, boolean interrupt) {
ServiceThread testServiceThread = startTestServiceThread(daemon);
shutdown0(interrupt, testServiceThread);
// repeat
shutdown0(interrupt, testServiceThread);
private void shutdown0(boolean interrupt, ServiceThread testServiceThread) {
if (interrupt) {
} else {
assertEquals(true, testServiceThread.isStopped());
assertEquals(true, testServiceThread.hasNotified.get());
assertEquals(0, testServiceThread.waitPoint.getCount());
public void stop(boolean interrupt) {
ServiceThread testServiceThread = startTestServiceThread();
stop0(interrupt, testServiceThread);
// repeat
stop0(interrupt, testServiceThread);
private void stop0(boolean interrupt, ServiceThread testServiceThread) {
if (interrupt) {
} else {
assertEquals(true, testServiceThread.isStopped());
assertEquals(true, testServiceThread.hasNotified.get());
assertEquals(0, testServiceThread.waitPoint.getCount());
* Copyright (C) 2010-2016 Alibaba Group Holding Limited
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.MixAll;
import org.junit.Assert;
import org.junit.Test;
* @author MQDevelopers
public class NamespaceUtilTest {
private static final String INSTANCE_ID = "MQ_INST_XXX";
private static final String INSTANCE_ID_WRONG = "MQ_INST_XXX1";
private static final String TOPIC = "TOPIC_XXX";
private static final String GROUP_ID = "GID_XXX";
private static final String SYSTEM_TOPIC = "rmq_sys_topic";
private static final String RETRY_TOPIC = MixAll.RETRY_GROUP_TOPIC_PREFIX + GROUP_ID;
private static final String RETRY_TOPIC_WITH_NAMESPACE =
private static final String DLQ_TOPIC = MixAll.DLQ_GROUP_TOPIC_PREFIX + GROUP_ID;
private static final String DLQ_TOPIC_WITH_NAMESPACE =
public void testWithoutNamespace() {
String topic = NamespaceUtil.withoutNamespace(TOPIC_WITH_NAMESPACE, INSTANCE_ID);
Assert.assertEquals(topic, TOPIC);
String topic1 = NamespaceUtil.withoutNamespace(TOPIC_WITH_NAMESPACE);
Assert.assertEquals(topic1, TOPIC);
String groupId = NamespaceUtil.withoutNamespace(GROUP_ID_WITH_NAMESPACE, INSTANCE_ID);
Assert.assertEquals(groupId, GROUP_ID);
String groupId1 = NamespaceUtil.withoutNamespace(GROUP_ID_WITH_NAMESPACE);
Assert.assertEquals(groupId1, GROUP_ID);
String consumerId = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE, INSTANCE_ID);
Assert.assertEquals(consumerId, RETRY_TOPIC);
String consumerId1 = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE);
Assert.assertEquals(consumerId1, RETRY_TOPIC);
String consumerId2 = NamespaceUtil.withoutNamespace(RETRY_TOPIC_WITH_NAMESPACE, INSTANCE_ID_WRONG);
Assert.assertEquals(consumerId2, RETRY_TOPIC_WITH_NAMESPACE);
Assert.assertNotEquals(consumerId2, RETRY_TOPIC);
public void testWrapNamespace() {
String topic1 = NamespaceUtil.wrapNamespace(INSTANCE_ID, TOPIC);
Assert.assertEquals(topic1, TOPIC_WITH_NAMESPACE);
String topicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, topic1);
Assert.assertEquals(topicWithNamespaceAgain, TOPIC_WITH_NAMESPACE);
//Wrap retry topic
String retryTopicWithNamespace = NamespaceUtil.wrapNamespace(INSTANCE_ID, RETRY_TOPIC);
Assert.assertEquals(retryTopicWithNamespace, RETRY_TOPIC_WITH_NAMESPACE);
String retryTopicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, retryTopicWithNamespace);
Assert.assertEquals(retryTopicWithNamespaceAgain, retryTopicWithNamespace);
//Wrap DLQ topic
String dlqTopicWithNamespace = NamespaceUtil.wrapNamespace(INSTANCE_ID, DLQ_TOPIC);
Assert.assertEquals(dlqTopicWithNamespace, DLQ_TOPIC_WITH_NAMESPACE);
String dlqTopicWithNamespaceAgain = NamespaceUtil.wrapNamespace(INSTANCE_ID, dlqTopicWithNamespace);
Assert.assertEquals(dlqTopicWithNamespaceAgain, dlqTopicWithNamespace);
Assert.assertEquals(dlqTopicWithNamespaceAgain, DLQ_TOPIC_WITH_NAMESPACE );
//test system topic
String systemTopic = NamespaceUtil.wrapNamespace(INSTANCE_ID, SYSTEM_TOPIC);
Assert.assertEquals(systemTopic, SYSTEM_TOPIC);
public void testGetNamespaceFromResource(){
String namespaceExpectBlank = NamespaceUtil.getNamespaceFromResource(TOPIC);
Assert.assertEquals(namespaceExpectBlank, NamespaceUtil.STRING_BLANK);
String namespace = NamespaceUtil.getNamespaceFromResource(TOPIC_WITH_NAMESPACE);
Assert.assertEquals(namespace, INSTANCE_ID);
String namespaceFromRetryTopic = NamespaceUtil.getNamespaceFromResource(RETRY_TOPIC_WITH_NAMESPACE);
Assert.assertEquals(namespaceFromRetryTopic, INSTANCE_ID);
\ No newline at end of file
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
public class QueryConsumeTimeSpanBodyTest {
public void testSetGet() throws Exception {
QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
List<QueueTimeSpan> firstQueueTimeSpans = newUniqueConsumeTimeSpanSet();
List<QueueTimeSpan> secondQueueTimeSpans = newUniqueConsumeTimeSpanSet();
public void testFromJson() throws Exception {
QueryConsumeTimeSpanBody qctsb = new QueryConsumeTimeSpanBody();
List<QueueTimeSpan> queueTimeSpans = new ArrayList<QueueTimeSpan>();
QueueTimeSpan queueTimeSpan = new QueueTimeSpan();
MessageQueue messageQueue = new MessageQueue("topicName", "brokerName", 1);
String json = RemotingSerializable.toJson(qctsb, true);
QueryConsumeTimeSpanBody fromJson = RemotingSerializable.fromJson(json, QueryConsumeTimeSpanBody.class);
public void testFromJsonRandom() throws Exception {
QueryConsumeTimeSpanBody origin = new QueryConsumeTimeSpanBody();
List<QueueTimeSpan> queueTimeSpans = newUniqueConsumeTimeSpanSet();
String json = origin.toJson(true);
QueryConsumeTimeSpanBody fromJson = RemotingSerializable.fromJson(json, QueryConsumeTimeSpanBody.class);
public void testEncode() throws Exception {
QueryConsumeTimeSpanBody origin = new QueryConsumeTimeSpanBody();
List<QueueTimeSpan> queueTimeSpans = newUniqueConsumeTimeSpanSet();
byte[] data = origin.encode();
QueryConsumeTimeSpanBody fromData = RemotingSerializable.decode(data, QueryConsumeTimeSpanBody.class);
private List<QueueTimeSpan> newUniqueConsumeTimeSpanSet() {
List<QueueTimeSpan> queueTimeSpans = new ArrayList<QueueTimeSpan>();
QueueTimeSpan queueTimeSpan = new QueueTimeSpan();
MessageQueue messageQueue = new MessageQueue(UUID.randomUUID().toString(), UUID.randomUUID().toString(), new Random().nextInt());
return queueTimeSpans;
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
public class BrokerStatsDataTest {
public void testFromJson() throws Exception {
BrokerStatsData brokerStatsData = new BrokerStatsData();
BrokerStatsItem brokerStatsItem = new BrokerStatsItem();
BrokerStatsItem brokerStatsItem = new BrokerStatsItem();
BrokerStatsItem brokerStatsItem = new BrokerStatsItem();
String json = RemotingSerializable.toJson(brokerStatsData, true);
BrokerStatsData brokerStatsDataResult = RemotingSerializable.fromJson(json, BrokerStatsData.class);
assertThat(brokerStatsDataResult.getStatsMinute().getAvgpt()).isCloseTo(brokerStatsData.getStatsMinute().getAvgpt(), within(0.0001));
assertThat(brokerStatsDataResult.getStatsMinute().getTps()).isCloseTo(brokerStatsData.getStatsMinute().getTps(), within(0.0001));
assertThat(brokerStatsDataResult.getStatsHour().getAvgpt()).isCloseTo(brokerStatsData.getStatsHour().getAvgpt(), within(0.0001));
assertThat(brokerStatsDataResult.getStatsHour().getTps()).isCloseTo(brokerStatsData.getStatsHour().getTps(), within(0.0001));
assertThat(brokerStatsDataResult.getStatsDay().getAvgpt()).isCloseTo(brokerStatsData.getStatsDay().getAvgpt(), within(0.0001));
assertThat(brokerStatsDataResult.getStatsDay().getTps()).isCloseTo(brokerStatsData.getStatsDay().getTps(), within(0.0001));
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumeMessageDirectlyResultTest {
public void testFromJson() throws Exception {
ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
boolean defaultAutoCommit = true;
boolean defaultOrder = false;
long defaultSpentTimeMills = 1234567L;
String defaultRemark = "defaultMark";
CMResult defaultCMResult = CMResult.CR_COMMIT;
String json = RemotingSerializable.toJson(result, true);
ConsumeMessageDirectlyResult fromJson = RemotingSerializable.fromJson(json, ConsumeMessageDirectlyResult.class);
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class QueryConsumeQueueResponseBodyTest {
public void test(){
QueryConsumeQueueResponseBody body = new QueryConsumeQueueResponseBody();
SubscriptionData subscriptionData = new SubscriptionData();
ConsumeQueueData data = new ConsumeQueueData();
data.setMsg("this is default msg");
List<ConsumeQueueData> list = new ArrayList<ConsumeQueueData>();
body.setFilterData("default filter data");
String json = RemotingSerializable.toJson(body, true);
QueryConsumeQueueResponseBody fromJson = RemotingSerializable.fromJson(json, QueryConsumeQueueResponseBody.class);
//test ConsumeQueue
ConsumeQueueData jsonData = fromJson.getQueueData().get(0);
assertThat(jsonData.getMsg()).isEqualTo("this is default msg");
//test QueryConsumeQueueResponseBody
assertThat(fromJson.getFilterData()).isEqualTo("default filter data");
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol.heartbeat;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.assertj.core.util.Sets;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class SubscriptionDataTest {
public void testConstructor1() {
SubscriptionData subscriptionData = new SubscriptionData();
public void testConstructor2() {
SubscriptionData subscriptionData = new SubscriptionData("TOPICA", "*");
public void testHashCodeNotEquals() {
SubscriptionData subscriptionData = new SubscriptionData("TOPICA", "*");
subscriptionData.setCodeSet(Sets.newLinkedHashSet(1, 2, 3));
subscriptionData.setTagsSet(Sets.newLinkedHashSet("TAGA", "TAGB", "TAG3"));
public void testFromJson() throws Exception {
SubscriptionData subscriptionData = new SubscriptionData("TOPICA", "*");
subscriptionData.setCodeSet(Sets.newLinkedHashSet(1, 2, 3));
subscriptionData.setTagsSet(Sets.newLinkedHashSet("TAGA", "TAGB", "TAG3"));
String json = RemotingSerializable.toJson(subscriptionData, true);
SubscriptionData fromJson = RemotingSerializable.fromJson(json, SubscriptionData.class);
public void testCompareTo() {
SubscriptionData subscriptionData = new SubscriptionData("TOPICA", "*");
SubscriptionData subscriptionData1 = new SubscriptionData("TOPICBA", "*");
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
public class TopicRouteDataTest {
public void testTopicRouteDataClone() throws Exception {
TopicRouteData topicRouteData = new TopicRouteData();
QueueData queueData = new QueueData();
List<QueueData> queueDataList = new ArrayList<QueueData>();
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "");
brokerAddrs.put(1L, "");
BrokerData brokerData = new BrokerData();
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
public void testTopicRouteDataJsonSerialize() throws Exception {
TopicRouteData topicRouteData = new TopicRouteData();
QueueData queueData = new QueueData();
List<QueueData> queueDataList = new ArrayList<QueueData>();
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "");
brokerAddrs.put(1L, "");
BrokerData brokerData = new BrokerData();
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
String topicRouteDataJsonStr = RemotingSerializable.toJson(topicRouteData, true);
TopicRouteData topicRouteDataFromJson = RemotingSerializable.fromJson(topicRouteDataJsonStr, TopicRouteData.class);
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.protocol.topic;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
public class OffsetMovedEventTest {
public void testFromJson() throws Exception {
OffsetMovedEvent event = mockOffsetMovedEvent();
String json = event.toJson();
OffsetMovedEvent fromJson = RemotingSerializable.fromJson(json, OffsetMovedEvent.class);
assertEquals(event, fromJson);
public void testFromBytes() throws Exception {
OffsetMovedEvent event = mockOffsetMovedEvent();
byte[] encodeData = event.encode();
OffsetMovedEvent decodeData = RemotingSerializable.decode(encodeData, OffsetMovedEvent.class);
assertEquals(event, decodeData);
private void assertEquals(OffsetMovedEvent srcData, OffsetMovedEvent decodeData) {
private OffsetMovedEvent mockOffsetMovedEvent() {
OffsetMovedEvent event = new OffsetMovedEvent();
event.setMessageQueue(new MessageQueue("test-topic", "test-broker", 0));
return event;
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.common.utils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import static org.junit.Assert.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
import java.lang.reflect.Method;
import java.util.List;
public class IOTinyUtilsTest {
private String testRootDir = System.getProperty("user.home") + File.separator + "iotinyutilstest";
public void init() {
File dir = new File(testRootDir);
if (dir.exists()) {
public void destory() {
File file = new File(testRootDir);
public void testToString() throws Exception {
byte[] b = "testToString".getBytes(RemotingHelper.DEFAULT_CHARSET);
InputStream is = new ByteArrayInputStream(b);
String str = IOTinyUtils.toString(is, null);
assertEquals("testToString", str);
is = new ByteArrayInputStream(b);
str = IOTinyUtils.toString(is, RemotingHelper.DEFAULT_CHARSET);
assertEquals("testToString", str);
is = new ByteArrayInputStream(b);
Reader isr = new InputStreamReader(is, RemotingHelper.DEFAULT_CHARSET);
str = IOTinyUtils.toString(isr);
assertEquals("testToString", str);
public void testCopy() throws Exception {
char[] arr = "testToString".toCharArray();
Reader reader = new CharArrayReader(arr);
Writer writer = new CharArrayWriter();
long count = IOTinyUtils.copy(reader, writer);
assertEquals(arr.length, count);
public void testReadLines() throws Exception {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
StringReader reader = new StringReader(sb.toString());
List<String> lines = IOTinyUtils.readLines(reader);
assertEquals(10, lines.size());
public void testToBufferedReader() throws Exception {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
StringReader reader = new StringReader(sb.toString());
Method method = IOTinyUtils.class.getDeclaredMethod("toBufferedReader", new Class[]{Reader.class});
Object bReader = method.invoke(IOTinyUtils.class, reader);
assertTrue(bReader instanceof BufferedReader);
public void testWriteStringToFile() throws Exception {
File file = new File(testRootDir, "testWriteStringToFile");
IOTinyUtils.writeStringToFile(file, "testWriteStringToFile", RemotingHelper.DEFAULT_CHARSET);
public void testCleanDirectory() throws Exception {
for (int i = 0; i < 10; i++) {
IOTinyUtils.writeStringToFile(new File(testRootDir, "testCleanDirectory" + i), "testCleanDirectory", RemotingHelper.DEFAULT_CHARSET);
File dir = new File(testRootDir);
assertTrue(dir.exists() && dir.isDirectory());
assertTrue(dir.listFiles().length > 0);
IOTinyUtils.cleanDirectory(new File(testRootDir));
assertTrue(dir.listFiles().length == 0);
public void testDelete() throws Exception {
for (int i = 0; i < 10; i++) {
IOTinyUtils.writeStringToFile(new File(testRootDir, "testDelete" + i), "testCleanDirectory", RemotingHelper.DEFAULT_CHARSET);
File dir = new File(testRootDir);
assertTrue(dir.exists() && dir.isDirectory());
assertTrue(dir.listFiles().length > 0);
IOTinyUtils.delete(new File(testRootDir));
public void testCopyFile() throws Exception {
File source = new File(testRootDir, "soruce");
String target = testRootDir + File.separator + "dest";
IOTinyUtils.writeStringToFile(source, "testCopyFile", RemotingHelper.DEFAULT_CHARSET);
IOTinyUtils.copyFile(source.getCanonicalPath(), target);
File dest = new File(target);
......@@ -14,6 +14,8 @@
# limitations under the License.
- 10.10.103.*
- 192.168.0.*
- accessKey: RocketMQ
......@@ -14,6 +14,6 @@
# limitations under the License.
accessKey: rocketmq
accessKey: rocketmq2
secretKey: 12345678
......@@ -20,7 +20,7 @@
<name>rocketmq-distribution ${project.version}</name>
......@@ -69,7 +69,7 @@
......@@ -104,7 +104,7 @@
......@@ -17,18 +17,21 @@
......@@ -17,7 +17,7 @@
......@@ -73,10 +73,14 @@ Broker端对权限的校验逻辑主要分为以下几步:
## 5. 热加载修改后权限控制定义
## 6. 权限控制的使用限制
(1)如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的distribution/conf/plain_acl.yml配置文件中
(2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组
......@@ -12,7 +12,7 @@
- 分区顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
##3 消息过滤
## 3 消息过滤
## 4 消息可靠性
......@@ -18,7 +18,7 @@ The Name Server serves as the provider of routing service. The producer or the c
## 7 Pull Consumer
A type of Consumer, the application pulls messages from brokers by actively invoking the consumer pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once the batch of messages is pulled, user application will initiate consuming process.
## 8 Push Consumer
A type of Consumer. Under this high real-time performance mode, it will push the message to the consumer actively when the Broker receives the data.
A type of Consumer, Under this high real-time performance mode, it will push the message to the consumer actively when the Broker receives the data.
## 9 Producer Group
A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the broker server will contact other producers in the same producer group to commit or rollback the transactional message.
## 10 Consumer Group
# Installation Guides
# Deployment Architectures and Setup Steps
## Cluster Setup
### 1 Single Master mode
This is the simplest but also the riskiest mode, that makes the entire service unavailable once the broker restarts or goes down. Production environments are not recommended, but can be used for local testing and development. Here are the steps to build.
This is the simplest, but also the riskiest mode, that makes the entire service unavailable once the broker restarts or goes down. Production environments are not recommended, but can be used for local testing and development. Here are the steps to build.
**1)Start NameServer**
......@@ -67,7 +69,7 @@ $ nohup sh mqbroker -n -c $ROCKETMQ_HOME/conf/2m-noslave/broker
The boot command shown above is used in the case of a single NameServer. For clusters of multiple NameServer, the address list after the -n argument in the broker boot command is separated by semicolons, for example, 9876;192.161.2: 9876.
The boot command shown above is used in the case of a single NameServer.For clusters of multiple NameServer, the address list after the -n argument in the broker boot command is separated by semicolons, for example, 9876;192.161.2: 9876.
### 3 Multiple Master And Multiple Slave Mode-Asynchronous replication
......@@ -5,7 +5,7 @@ Message publication refers to that a producer sends messages to a topic; Message
## 2 Message Ordering
Message ordering refers to that a group of messages can be consumed orderly as they are published. For example, an order generates three messages: order creation, order payment, and order completion. It only makes sense to consume them in their generated order, but orders can be consumed in parallel at the same time. RocketMQ can strictly guarantee these messages are in order.
Orderly message are divided into global orderly message and partitioned orderly message. Global order means that all messages under a certain topic must be in order, partitioned order only requires each group of messages are consumed orderly.
Orderly message is divided into global orderly message and partitioned orderly message. Global order means that all messages under a certain topic must be in order, partitioned order only requires each group of messages are consumed orderly.
- Global message ordering:
For a given Topic, all messages are published and consumed in strict first-in-first-out (FIFO) order.
Applicable scenario: the performance requirement is not high, and all messages are published and consumed according to FIFO principle strictly.
......@@ -14,7 +14,7 @@ For a given Topic, all messages are partitioned according to sharding key. Messa
Applicable scenario: high performance requirement, with sharding key as the partition field, messages within the same partition are published and consumed according to FIFO principle strictly.
## 3 Message Filter
Consumers of RocketMQ can filter messages based on tags as well as support for user-defined attribute filtering. Message filter is currently implemented on the Broker side, with the advantage of reducing the network transmission of useless messages for Consumer and the disadvantage of increasing the burden on the Broker and relatively complex implementation.
Consumers of RocketMQ can filter messages based on tags as well as supporting for user-defined attribute filtering. Message filter is currently implemented on the Broker side, with the advantage of reducing the network transmission of useless messages for Consumer and the disadvantage of increasing the burden on the Broker and relatively complex implementation.
## 4 Message Reliability
RocketMQ supports high reliability of messages in several situations:
......@@ -25,22 +25,22 @@ RocketMQ supports high reliability of messages in several situations:
5 The machine cannot be started up (the CPU, motherboard, memory and other key equipment may be damaged)
6 Disk equipment damaged
In the four cases of 1), 2), 3), and 4) where the hardware resource can be recovered immediately, RocketMQ guarantees that the message will not be lost or a small amount of data will be lost (depending on whether the flush disk type is synchronous or asynchronous)
In the four cases of 1), 2), 3), and 4) where the hardware resource can be recovered immediately, RocketMQ guarantees that the message will not be lost or a small amount of data will be lost (depending on whether the flush disk type is synchronous or asynchronous).
5) and 6) are single point of failure and cannot be recovered. Once it happens, all messages on the single point will be lost. In both cases, RocketMQ ensures that 99% of the messages are not lost through asynchronous replication, but a very few number of messages may still be lost. Synchronous double write mode can completely avoid single point of failure, which will surely affect the performance and suitable for the occasion of high demand for message reliability, such as money related applications. Note: RocketMQ supports synchronous double writes since version 3.0.
5 ) and 6) are single point of failure and cannot be recovered. Once it happens, all messages on the single point will be lost. In both cases, RocketMQ ensures that 99% of the messages are not lost through asynchronous replication, but a very few number of messages may still be lost. Synchronous double write mode can completely avoid single point of failure, which will surely affect the performance and suitable for the occasion of high demand for message reliability, such as money related applications. Note: RocketMQ supports synchronous double writes since version 3.0.
## 5 At Least Once
At least Once refers to that every message will be delivered at least once. RocketMQ supports this feature because the Consumer pulls the message locally and does not send an ack back to the server until it has consumed it.
## 6 Backtracking Consumption
Backtracking consumption refers to that the Consumer has consumed the message successfully, but the business needs to consume again. To support this function, the message still needs to be retained after the Broker sends the message to the Consumer successfully. The re-consumption is normally based on time dimension. For example, after the recovery of the Consumer system failure, the data one hour ago needs to be re-consumed, then the Broker needs to provide a mechanism to reverse the consumption progress according to the time dimension. RocketMQ supports backtracking consumption by time trace, with the time dimension down to milliseconds.
Backtracking consumption refers to that the Consumer has consumed the message successfully, but the business needs to consume again. To support this function, the message still needs to be retained after the Broker sends the message to the Consumer successfully. The re-consumption is normally based on time dimension. For example, after the recovery of the Consumer system failured, the data one hour ago needs to be re-consumed, then the Broker needs to provide a mechanism to reverse the consumption progress according to the time dimension. RocketMQ supports backtracking consumption by time trace, with the time dimension down to milliseconds.
## 7 Transactional Message
RocketMQ transactional message refers to the fact that the application of a local transaction and the sending of a Message operation can be defined in a global transaction which means both succeed or fail simultaneously. RocketMQ transactional message provides distributed transaction functionality similar to X/Open XA, enabling the ultimate consistency of distributed transactions through transactional message.
RocketMQ transactional message refers to the fact that the application of a local transaction and the sending of a Message operation can be defined in a global transaction which means both succeed or failed simultaneously. RocketMQ transactional message provides distributed transaction functionality similar to X/Open XA, enabling the ultimate consistency of distributed transactions through transactional message.
## 8 Scheduled Message
Scheduled message(delay queue) refers to that messages are not consumed immediately after they are sent to the broker, but waiting to be delivered to the real topic after a specific time.
The broker has a configuration item, messageDelayLevel, with default values “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, 18 levels. Users can configure a custom messageDelayLevel. Note that messageDelayLevel is a broker's property rather than a topic's. When sending a message, just set the delayLevel level: msg.setDelayLevel(level). There are three types of levels:
Scheduled message(delay queue) refers to that messages are not consumed immediately after they are sent to the broker, but waiting to be delivered to the real topic after a specific time.
The broker has a configuration item, `messageDelayLevel`, with default values “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, 18 levels. Users can configure a custom `messageDelayLevel`. Note that `messageDelayLevel` is a broker's property rather than a topic's. When sending a message, just set the delayLevel level: msg.setDelayLevel(level). There are three types of levels:
- level == 0, The message is not a delayed message
- 1<=level<=maxLevel, Message delay specific time, such as level==1, delay for 1s
......@@ -52,24 +52,24 @@ Note that Scheduled messages are counted both the first time they are written an
## 9 Message Retry
When the Consumer fails to consume the message, a retry mechanism is needed to make the message to be consumed again. Consumer's consume failure can usually be classified as follows:
- due to the reasons of the message itself, such as deserialization failure, the message data itself cannot be processed (for example, the phone number of the current message is cancelled and cannot be charged), etc. This kind of error usually requires skipping this message and consuming others since immediately retry would be failed 99%, so it is better to provide a timed retry mechanism that retries after 10 seconds.
- due to the reasons of dependent downstream application services are not available, such as db connection is not usable, perimeter network is not unreachable, etc. When this kind of error is encountered, consuming other messages will also result in an error even if the current failed message is skipped. In this case, it is recommended to sleep for 30s before consuming the next message, which will reduce the pressure on the broker to retry the message.
- Due to the reasons of the message itself, such as deserialization failure, the message data itself cannot be processed (for example, the phone number of the current message is cancelled and cannot be charged), etc. This kind of error usually requires skipping this message and consuming others since immediately retry would be failed 99%, so it is better to provide a timed retry mechanism that retries after 10 seconds.
- Due to the reasons of dependent downstream application services are not available, such as db connection is not usable, perimeter network is not unreachable, etc. When this kind of error is encountered, consuming other messages will also result in an error even if the current failed message is skipped. In this case, it is recommended to sleep for 30s before consuming the next message, which will reduce the pressure on the broker to retry the message.
RocketMQ will set up a retry queue named “%RETRY%+consumerGroup” for each consumer group(Note that the retry queue for this topic is for consumer groups, not for each topic) to temporarily save messages cannot be consumed by customer due to all kinds of reasons. Considering that it takes some time for the exception to recover, multiple retry levels are set for the retry queue, and each retry level has a corresponding re-deliver delay. The more retries, the greater the deliver delay. RocketMQ first save retry messages to the delay queue which topic is named “SCHEDULE_TOPIC_XXXX”, then background schedule task will save the messages to “%RETRY%+consumerGroup” retry queue according to their corresponding delay.
RocketMQ will set up a retry queue named “%RETRY%+consumerGroup” for each consumer group(Note that the retry queue for this topic is for consumer groups, not for each topic) to temporarily save messages cannot be consumed by customer due to all kinds of reasons. Considering that it takes some time for the exception to recover, multiple retry levels are set for the retry queue, and each retry level has a corresponding re-deliver delay. The more retries, the greater the deliver delay. RocketMQ first save retry messages to the delay queue which topic is named “SCHEDULE_TOPIC_XXXX”, then background schedule task will save the messages to “%RETRY%+consumerGroup” retry queue according to their corresponding delay.
## 10 Message Resend
When a producer sends a message, the synchronous message will be resent if fails, the asynchronous message will retry and oneway message is without any guarantee. Message resend ensures that messages are sent successfully and without lost as much as possible, but it can lead to message duplication, which is an unavoidable problem in RocketMQ. Under normal circumstances, message duplication will not occur, but when there is a large number of messages and network jitter, message duplication will be a high-probability event. In addition, producer initiative messages resend and the consumer load changes will also result in duplicate messages. The message retry policy can be set as follows:
- retryTimesWhenSendFailed: Synchronous message retry times when send failed, default value is 2, so the producer will try to send retryTimesWhenSendFailed + 1 times at most. To ensure that the message is not lost, producer will try sending the message to another broker instead of selecting the broker that failed last time. An exception will be thrown if it reaches the retry limit, and the client should guarantee that the message will not be lost. Messages will resend when RemotingException, MQClientException, and partial MQBrokerException occur.
- retryTimesWhenSendAsyncFailed: Asynchronous message retry times when send failed, asynchronous retry sends message to the same broker instead of selecting another one and does not guarantee that the message wont lost.
- retryAnotherBrokerWhenNotStoreOK: Message flush disk (master or slave) timeout or slave not available (return status is not SEND_OK), whether to try to send to another broker, default value is false. Very important messages can set to true.
- `retryTimesWhenSendFailed`: Synchronous message retry times when send failed, default value is 2, so the producer will try to send `retryTimesWhenSendFailed` + 1 times at most. To ensure that the message is not lost, producer will try sending the message to another broker instead of selecting the broker that failed last time. An exception will be thrown if it reaches the retry limit, and the client should guarantee that the message will not be lost. Messages will resend when RemotingException, MQClientException, and partial MQBrokerException occur.
- `retryTimesWhenSendAsyncFailed`: Asynchronous message retry times when send failed, asynchronous retry sends message to the same broker instead of selecting another one and does not guarantee that the message wont lost.
- `retryAnotherBrokerWhenNotStoreOK`: Message flush disk (master or slave) timeout or slave not available (return status is not SEND_OK), whether to try to send to another broker, default value is false. Very important messages can set to true.
## 11 Flow Control
Producer flow control, because broker processing capacity reaches a bottleneck; Consumer flow control, because the consumption capacity reaches a bottleneck.
Producer flow control:
- When commitLog file locked time exceeds osPageCacheBusyTimeOutMills, default value of osPageCacheBusyTimeOutMills is 1000 ms, then return flow control.
- If transientStorePoolEnable == true, and the broker is asynchronous flush disk type, and resources are insufficient in the transientStorePool, reject the current send request and return flow control.
- When commitLog file locked time exceeds osPageCacheBusyTimeOutMills, default value of `osPageCacheBusyTimeOutMills` is 1000 ms, then return flow control.
- If `transientStorePoolEnable` == true, and the broker is asynchronous flush disk type, and resources are insufficient in the transientStorePool, reject the current send request and return flow control.
- The broker checks the head request wait time of the send request queue every 10ms. If the wait time exceeds waitTimeMillsInSendQueue, which default value is 200ms, the current send request is rejected and the flow control is returned.
- The broker implements flow control by rejecting send requests.
......@@ -19,7 +19,7 @@
......@@ -51,12 +51,12 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.example.namespace;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerWithNamespace {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("InstanceTest", "pidTest");
for (int i = 0; i < 100; i++) {
Message message = new Message("topicTest", "tagTest", "Hello world".getBytes());
try {
SendResult result = producer.send(message);
System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());
} catch (Exception e) {
\ No newline at end of file
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.example.namespace;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageQueue;
public class PullConsumerWithNamespace {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws Exception {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("InstanceTest", "cidTest");
Set<MessageQueue> mqs = pullConsumer.fetchSubscribeMessageQueues("topicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the topic: %s, queue: %s%n", mq.getTopic(), mq);
while (true) {
try {
PullResult pullResult =
pullConsumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
case NO_NEW_MSG:
break SINGLE_MQ;
} catch (Exception e) {
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null) {
return offset;
return 0;
private static void dealWithPullResult(PullResult pullResult) {
if (null == pullResult || pullResult.getMsgFoundList().isEmpty()) {
(msg) -> System.out.printf("Topic is:%s, msgId is:%s%n" , msg.getTopic(), msg.getMsgId()));
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
\ No newline at end of file
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.example.namespace;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
public class PushConsumerWithNamespace {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("InstanceTest", "cidTest");
defaultMQPushConsumer.subscribe("topicTest", "*");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
msgs.stream().forEach((msg) -> {
System.out.printf("Msg topic is:%s, MsgId is:%s, reconsumeTimes is:%s%n", msg.getTopic() , msg.getMsgId(), msg.getReconsumeTimes());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
\ No newline at end of file
......@@ -20,7 +20,7 @@
......@@ -19,7 +19,7 @@
......@@ -19,7 +19,7 @@
......@@ -19,7 +19,7 @@
......@@ -20,7 +20,7 @@
......@@ -29,7 +29,7 @@
<name>Apache RocketMQ ${project.version}</name>
......@@ -42,7 +42,7 @@
......@@ -19,7 +19,7 @@
......@@ -19,7 +19,7 @@
......@@ -19,7 +19,7 @@
......@@ -1864,10 +1864,14 @@ public class DefaultMessageStore implements MessageStore {
this.reputFromOffset += size;
} else {
doNext = false;
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset += result.getSize() - readSize;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset += result.getSize() - readSize;
......@@ -20,7 +20,7 @@
......@@ -19,7 +19,7 @@
......@@ -21,14 +21,10 @@ import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -82,6 +78,9 @@ import org.slf4j.LoggerFactory;
public class MQAdminStartup {
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
private static String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
public static void main(String[] args) {
main0(args, null);
......@@ -132,7 +131,7 @@ public class MQAdminStartup {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
cmd.execute(commandLine, options, getAclRPCHook());
cmd.execute(commandLine, options, AclUtils.getAclRPCHook(rocketmqHome + MixAll.ACL_CONF_TOOLS_FILE));
} else {
System.out.printf("The sub command %s not exist.%n", args[0]);
......@@ -203,8 +202,6 @@ public class MQAdminStartup {
private static void initLogback() throws JoranException {
String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
......@@ -245,25 +242,4 @@ public class MQAdminStartup {
public static void initCommand(SubCommand command) {
public static RPCHook getAclRPCHook() {
String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
String fileName = "/conf/tools.yml";
JSONObject yamlDataObject = AclUtils.getYamlDataObject(fileHome + fileName ,
if (yamlDataObject == null || yamlDataObject.isEmpty()) {
System.out.printf(" Cannot find conf file %s, acl is not be enabled.%n" ,fileHome + fileName);
return null;
String accessKey = yamlDataObject.getString("accessKey");
String secretKey = yamlDataObject.getString("secretKey");
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
System.out.printf("AccessKey or secretKey is blank, the acl is not enabled.%n");
return null;
return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
......@@ -36,6 +36,24 @@ import org.apache.rocketmq.tools.command.SubCommandException;
public class BrokerConsumeStatsSubCommad implements SubCommand {
private DefaultMQAdminExt defaultMQAdminExt;
private DefaultMQAdminExt createMQAdminExt(RPCHook rpcHook) throws SubCommandException {
if (this.defaultMQAdminExt != null) {
return defaultMQAdminExt;
} else {
defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
try {
catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
return defaultMQAdminExt;
public String commandName() {
return "brokerConsumeStats";
......@@ -69,10 +87,9 @@ public class BrokerConsumeStatsSubCommad implements SubCommand {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
try {
defaultMQAdminExt = createMQAdminExt(rpcHook);
String brokerAddr = commandLine.getOptionValue('b').trim();
boolean isOrder = false;
long timeoutMillis = 50000;
......@@ -39,6 +39,24 @@ import org.apache.rocketmq.tools.command.SubCommandException;
public class QueryMsgByUniqueKeySubCommand implements SubCommand {
private DefaultMQAdminExt defaultMQAdminExt;
private DefaultMQAdminExt createMQAdminExt(RPCHook rpcHook) throws SubCommandException {
if (this.defaultMQAdminExt != null) {
return defaultMQAdminExt;
} else {
defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
try {
catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
return defaultMQAdminExt;
public static void queryById(final DefaultMQAdminExt admin, final String topic,
final String msgId) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException, IOException {
......@@ -182,11 +200,10 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
try {
defaultMQAdminExt = createMQAdminExt(rpcHook);
final String msgId = commandLine.getOptionValue('i').trim();
final String topic = commandLine.getOptionValue('t').trim();
......@@ -49,6 +49,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class BrokerConsumeStatsSubCommadTest {
private static BrokerConsumeStatsSubCommad cmd = new BrokerConsumeStatsSubCommad();
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
......@@ -81,10 +84,13 @@ public class BrokerConsumeStatsSubCommadTest {
public static void terminate() {
public void testExecute() throws SubCommandException {
BrokerConsumeStatsSubCommad cmd = new BrokerConsumeStatsSubCommad();
public void testExecute() throws SubCommandException, IllegalAccessException, NoSuchFieldException {
Field field = BrokerConsumeStatsSubCommad.class.getDeclaredField("defaultMQAdminExt");
field.set(cmd, defaultMQAdminExt);
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-b", "-t 3000", "-l 5", "-o true"};
final CommandLine commandLine =
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册