未验证 提交 b6307dfc 编写于 作者: J Jun 提交者: GitHub

[ISSUE #1976]System topic should add permission checking globally (#1985)

* fix #1976
System Topic like SCHEDULE_TOPIC_XXXX should not be create or delete by user

* User can not send message to system topic SCHEDULE_TOPIC_XXXX

* the-->The

* fix magic number 18

* move system topic to TopicValidator except TBW102

* move TBW102 and isSystemTopic to TopicValidator

* add test code

* validateTopic --> validatorSystemTopic, validatorBlacklistTopic

* validateTopic --> validatorSystemTopic, validatorBlacklistTopic

* rename some methods
上级 7e756882
......@@ -25,7 +25,7 @@ import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.topic.TopicValidator;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
......@@ -176,6 +176,9 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response;
}
if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
return response;
}
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
......
......@@ -19,17 +19,6 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController;
......@@ -37,8 +26,8 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.broker.topic.TopicValidator;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
......@@ -137,6 +126,18 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
......@@ -252,29 +253,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
String topic = requestHeader.getTopic();
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
if (!TopicValidator.validateTopic(topic, response)) {
return response;
}
try {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
ctx.writeAndFlush(response);
} catch (Exception e) {
log.error("Failed to produce a proper response", e);
if (TopicValidator.isSystemTopic(topic, response)) {
return response;
}
TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
......@@ -285,7 +273,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
return null;
response.setCode(ResponseCode.SUCCESS);
return response;
}
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
......@@ -296,7 +285,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
String topic = requestHeader.getTopic();
if (!TopicValidator.validateTopic(topic, response)) {
return response;
}
if (TopicValidator.isSystemTopic(topic, response)) {
return response;
}
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
......@@ -430,7 +427,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
responseHeader.setBrokerAddr(this.brokerController.getBrokerAddr());
responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
responseHeader.setClusterName(this.brokerController.getBrokerConfig().getBrokerClusterName());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
......@@ -1118,7 +1115,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Set<String> topics = this.brokerController.getTopicConfigManager().getSystemTopic();
Set<String> topics = TopicValidator.getSystemTopicSet();
TopicList topicList = new TopicList();
topicList.setTopicList(topics);
response.setBody(topicList.encode());
......
......@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
......@@ -523,7 +524,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
private void generateOffsetMovedEvent(final OffsetMovedEvent event) {
try {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT);
msgInner.setTopic(TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT);
msgInner.setTags(event.getConsumerGroup());
msgInner.setDelayTimeLevel(0);
msgInner.setKeys(event.getConsumerGroup());
......
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.broker.topic;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
......@@ -37,18 +36,20 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class TopicConfigManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
private transient final Lock lockTopicConfigTable = new ReentrantLock();
private final ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(1024);
private final DataVersion dataVersion = new DataVersion();
private final Set<String> systemTopicList = new HashSet<String>();
private transient BrokerController brokerController;
public TopicConfigManager() {
......@@ -57,20 +58,18 @@ public class TopicConfigManager extends ConfigManager {
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
{
// MixAll.SELF_TEST_TOPIC
String topic = MixAll.SELF_TEST_TOPIC;
String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
......@@ -81,10 +80,9 @@ public class TopicConfigManager extends ConfigManager {
}
}
{
// MixAll.BENCHMARK_TOPIC
String topic = MixAll.BENCHMARK_TOPIC;
String topic = TopicValidator.RMQ_SYS_BENCHMARK_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1024);
topicConfig.setWriteQueueNums(1024);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
......@@ -93,7 +91,7 @@ public class TopicConfigManager extends ConfigManager {
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
TopicValidator.addSystemTopic(topic);
int perm = PermName.PERM_INHERIT;
if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
......@@ -105,7 +103,7 @@ public class TopicConfigManager extends ConfigManager {
String topic = this.brokerController.getBrokerConfig().getBrokerName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
TopicValidator.addSystemTopic(topic);
int perm = PermName.PERM_INHERIT;
if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
......@@ -116,19 +114,26 @@ public class TopicConfigManager extends ConfigManager {
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// MixAll.OFFSET_MOVED_EVENT
String topic = MixAll.OFFSET_MOVED_EVENT;
String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
......@@ -137,21 +142,13 @@ public class TopicConfigManager extends ConfigManager {
{
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
public boolean isSystemTopic(final String topic) {
return this.systemTopicList.contains(topic);
}
public Set<String> getSystemTopic() {
return this.systemTopicList;
}
public TopicConfig selectTopicConfig(final String topic) {
return this.topicConfigTable.get(topic);
}
......@@ -170,7 +167,7 @@ public class TopicConfigManager extends ConfigManager {
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
......@@ -275,7 +272,7 @@ public class TopicConfigManager extends ConfigManager {
}
public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) {
TopicConfig topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
TopicConfig topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
if (topicConfig != null)
return topicConfig;
......@@ -284,18 +281,18 @@ public class TopicConfigManager extends ConfigManager {
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
if (topicConfig != null)
return topicConfig;
topicConfig = new TopicConfig(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
topicConfig = new TopicConfig(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(0);
log.info("create new topic {}", topicConfig);
this.topicConfigTable.put(MixAll.TRANS_CHECK_MAX_TIME_TOPIC, topicConfig);
this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, topicConfig);
createNew = true;
this.dataVersion.nextVersion();
this.persist();
......
......@@ -21,13 +21,13 @@ import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageExtBrokerInner;
......@@ -127,7 +127,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
import java.nio.charset.Charset;
......@@ -25,11 +26,11 @@ public class TransactionalMessageUtil {
public static Charset charset = Charset.forName("utf-8");
public static String buildOpTopic() {
return MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC;
return TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC;
}
public static String buildHalfTopic() {
return MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
}
public static String buildConsumerGroup() {
......
......@@ -16,17 +16,22 @@
*/
package org.apache.rocketmq.broker.processor;
import com.google.common.collect.Sets;
import io.netty.channel.ChannelHandlerContext;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
......@@ -47,6 +52,10 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
......@@ -61,17 +70,31 @@ public class AdminBrokerProcessorTest {
@Spy
private BrokerController
brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
new MessageStoreConfig());
brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
new MessageStoreConfig());
@Mock
private MessageStore messageStore;
private Set<String> systemTopicSet;
@Before
public void init() {
brokerController.setMessageStore(messageStore);
adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
systemTopicSet = Sets.newHashSet(
TopicValidator.RMQ_SYS_SELF_TEST_TOPIC,
TopicValidator.RMQ_SYS_BENCHMARK_TOPIC,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT,
TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC,
this.brokerController.getBrokerConfig().getBrokerClusterName(),
this.brokerController.getBrokerConfig().getBrokerName(),
this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX);
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
systemTopicSet.add(this.brokerController.getBrokerConfig().getMsgTraceTopicName());
}
}
@Test
......@@ -94,6 +117,67 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
}
@Test
public void testUpdateAndCreateTopic() throws Exception {
//test system topic
for (String topic : systemTopicSet) {
RemotingCommand request = buildCreateTopicRequest(topic);
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic.");
}
//test validate error topic
String topic = "";
RemotingCommand request = buildCreateTopicRequest(topic);
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
topic = "TEST_CREATE_TOPIC";
request = buildCreateTopicRequest(topic);
response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void testDeleteTopic() throws Exception {
//test system topic
for (String topic : systemTopicSet) {
RemotingCommand request = buildDeleteTopicRequest(topic);
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic.");
}
String topic = "TEST_DELETE_TOPIC";
RemotingCommand request = buildDeleteTopicRequest(topic);
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
private RemotingCommand buildCreateTopicRequest(String topic) {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name());
requestHeader.setReadQueueNums(8);
requestHeader.setWriteQueueNums(8);
requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
request.makeCustomHeaderToNet();
return request;
}
private RemotingCommand buildDeleteTopicRequest(String topic) {
DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);
request.makeCustomHeaderToNet();
return request;
}
private MessageExt createDefaultMessageExt() {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("12345678");
......@@ -106,10 +190,11 @@ public class AdminBrokerProcessorTest {
return messageExt;
}
private SelectMappedBufferResult createSelectMappedBufferResult(){
SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024) ,0, new MappedFile());
private SelectMappedBufferResult createSelectMappedBufferResult() {
SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024), 0, new MappedFile());
return result;
}
private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() {
ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader();
header.setMsgId("C0A803CA00002A9F0000000000031367");
......
......@@ -25,15 +25,14 @@ import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
......@@ -115,7 +114,7 @@ public class ReplyMessageProcessorTest {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(group);
requestHeader.setTopic(topic);
requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC);
requestHeader.setDefaultTopicQueueNums(3);
requestHeader.setQueueId(1);
requestHeader.setSysFlag(0);
......
......@@ -23,7 +23,6 @@ import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
......@@ -32,6 +31,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
......@@ -238,7 +238,7 @@ public class SendMessageProcessorTest {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(group);
requestHeader.setTopic(topic);
requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC);
requestHeader.setDefaultTopicQueueNums(3);
requestHeader.setQueueId(1);
requestHeader.setSysFlag(0);
......
......@@ -19,10 +19,10 @@ package org.apache.rocketmq.broker.transaction.queue;
import java.net.InetSocketAddress;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.MessageExtBrokerInner;
......@@ -92,7 +92,7 @@ public class DefaultTransactionalMessageCheckListenerTest {
@Test
public void testResolveDiscardMsg() {
MessageExt messageExt = new MessageExt();
messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
messageExt.setTopic(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC);
messageExt.setQueueId(0);
messageExt.setBody("test resolve discard msg".getBytes());
messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911));
......
......@@ -20,11 +20,11 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.AppendMessageResult;
......@@ -98,7 +98,7 @@ public class TransactionalMessageBridgeTest {
@Test
public void testFetchMessageQueues() {
Set<MessageQueue> messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
Set<MessageQueue> messageQueues = transactionBridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC);
assertThat(messageQueues.size()).isEqualTo(1);
}
......
......@@ -23,13 +23,13 @@ import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.AppendMessageResult;
......@@ -108,10 +108,10 @@ public class TransactionalMessageServiceImplTest {
@Test
public void testCheck_withDiscard() {
when(bridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(MixAll.RMQ_SYS_TRANS_HALF_TOPIC));
when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createDiscardPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hellp", 1));
when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0));
when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createOpPulResult(MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "10", 1));
when(bridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC));
when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createDiscardPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hellp", 1));
when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0));
when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createOpPulResult(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "10", 1));
long timeOut = this.brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax();
final AtomicInteger checkMessage = new AtomicInteger(0);
......@@ -128,10 +128,10 @@ public class TransactionalMessageServiceImplTest {
@Test
public void testCheck_withCheck() {
when(bridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(MixAll.RMQ_SYS_TRANS_HALF_TOPIC));
when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1));
when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0));
when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0));
when(bridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC));
when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1));
when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0));
when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0));
when(bridge.getBrokerController()).thenReturn(this.brokerController);
when(bridge.renewHalfMessageInner(any(MessageExtBrokerInner.class))).thenReturn(createMessageBrokerInner());
when(bridge.putMessageReturnResult(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
......
......@@ -21,10 +21,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.topic.TopicValidator;
/**
* Common Validator
......@@ -85,6 +85,7 @@ public class Validators {
}
// topic
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
......@@ -116,11 +117,19 @@ public class Validators {
throw new MQClientException(
String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
}
}
public static void isSystemTopic(String topic) throws MQClientException {
if (TopicValidator.isSystemTopic(topic)) {
throw new MQClientException(
String.format("The topic[%s] is conflict with system topic.", topic), null);
}
}
//whether the same with system reserved keyword
if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
public static void isNotAllowedSendTopic(String topic) throws MQClientException {
if (TopicValidator.isNotAllowedSendTopic(topic)) {
throw new MQClientException(
String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null);
String.format("Sending message to topic[%s] is forbidden.", topic), null);
}
}
......
......@@ -82,6 +82,7 @@ public class MQAdminImpl {
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
try {
Validators.checkTopic(newTopic);
Validators.isSystemTopic(newTopic);
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
if (brokerDataList != null && !brokerDataList.isEmpty()) {
......
......@@ -1364,7 +1364,7 @@ public class MQClientAPIImpl {
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (allowTopicNotExist) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
......
......@@ -671,7 +671,7 @@ public class MQClientInstance {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
......
......@@ -421,6 +421,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.makeSureStateOK();
Validators.checkTopic(newTopic);
Validators.isSystemTopic(newTopic);
this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
}
......
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -74,7 +75,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Just for testing or demo program
*/
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
/**
* Number of queues to create per default topic.
......
......@@ -41,11 +41,11 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -90,7 +90,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
10, //
......
......@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
public class TraceConstants {
......@@ -24,5 +24,5 @@ public class TraceConstants {
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
public static final String TRACE_TOPIC_PREFIX = TopicValidator.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
}
......@@ -19,11 +19,12 @@ package org.apache.rocketmq.client;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.fail;
public class ValidatorsTest {
......@@ -47,17 +48,6 @@ public class ValidatorsTest {
}
}
@Test
public void testCheckTopic_UseDefaultTopic() {
String defaultTopic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
try {
Validators.checkTopic(defaultTopic);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageStartingWith(String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", defaultTopic));
}
}
@Test
public void testCheckTopic_BlankTopic() {
String blankTopic = "";
......@@ -80,4 +70,30 @@ public class ValidatorsTest {
assertThat(e).hasMessageStartingWith("The specified topic is longer than topic max length");
}
}
@Test
public void testIsSystemTopic() {
for (String topic : TopicValidator.getSystemTopicSet()) {
try {
Validators.isSystemTopic(topic);
fail("excepted MQClientException for system topic");
} catch (MQClientException e) {
assertThat(e.getResponseCode()).isEqualTo(-1);
assertThat(e.getErrorMessage()).isEqualTo(String.format("The topic[%s] is conflict with system topic.", topic));
}
}
}
@Test
public void testIsNotAllowedSendTopic() {
for (String topic : TopicValidator.getNotAllowedSendTopicSet()) {
try {
Validators.isNotAllowedSendTopic(topic);
fail("excepted MQClientException for blacklist topic");
} catch (MQClientException e) {
assertThat(e.getResponseCode()).isEqualTo(-1);
assertThat(e.getErrorMessage()).isEqualTo(String.format("Sending message to topic[%s] is forbidden.", topic));
}
}
}
}
......@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
......@@ -90,7 +91,7 @@ import static org.mockito.Mockito.when;
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String topic = "FooBar";
private String brokerName = "BrokerA";
......
......@@ -31,12 +31,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
......@@ -75,7 +75,7 @@ public class DefaultMQProducerWithTraceTest {
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
......
......@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
......@@ -52,7 +53,7 @@ public class BrokerConfig {
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";
@ImportantField
private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
private String msgTraceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
@ImportantField
private boolean traceTopicEnable = false;
/**
......
......@@ -55,8 +55,6 @@ public class MixAll {
public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
//http://jmenv.tbsite.net:8080/rocketmq/nsaddr
//public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
public static final String BENCHMARK_TOPIC = "BenchmarkTest";
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
......@@ -65,8 +63,6 @@ public class MixAll {
public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP";
public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP";
public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY";
public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION";
public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
......@@ -80,14 +76,9 @@ public class MixAll {
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static final String REPLY_MESSAGE_FLAG = "reply";
......@@ -115,10 +106,6 @@ public class MixAll {
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
}
public static boolean isSystemTopic(final String topic) {
return topic.startsWith(SYSTEM_TOPIC_PREFIX);
}
public static String getDLQTopic(final String consumerGroup) {
return DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
}
......
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.common.protocol;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
public class NamespaceUtil {
public static final char NAMESPACE_SEPARATOR = '%';
......@@ -155,11 +156,11 @@ public class NamespaceUtil {
return false;
}
if (MixAll.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) {
if (TopicValidator.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) {
return true;
}
return MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC.equals(resource);
return false;
}
public static boolean isRetryTopic(String resource) {
......
......@@ -14,21 +14,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
package org.apache.rocketmq.common.topic;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TopicValidator {
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
public static final String RMQ_SYS_BENCHMARK_TOPIC = "BenchmarkTest";
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
public static final String RMQ_SYS_SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
private static final int TOPIC_MAX_LENGTH = 127;
private static final Set<String> SYSTEM_TOPIC_SET = new HashSet<String>();
/**
* Topics'set which client can not send msg!
*/
private static final Set<String> NOT_ALLOWED_SEND_TOPIC_SET = new HashSet<String>();
static {
SYSTEM_TOPIC_SET.add(AUTO_CREATE_TOPIC_KEY_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_BENCHMARK_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_HALF_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_TRACE_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_OP_HALF_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT);
NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC);
}
private static boolean regularExpressionMatcher(String origin, Pattern pattern) {
if (pattern == null) {
return true;
......@@ -57,13 +92,44 @@ public class TopicValidator {
return false;
}
//whether the same with system reserved keyword
if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
return true;
}
public static boolean isSystemTopic(String topic, RemotingCommand response) {
if (isSystemTopic(topic)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.");
return false;
response.setRemark("The topic[" + topic + "] is conflict with system topic.");
return true;
}
return false;
}
return true;
public static boolean isSystemTopic(String topic) {
return SYSTEM_TOPIC_SET.contains(topic) || topic.startsWith(SYSTEM_TOPIC_PREFIX);
}
public static boolean isNotAllowedSendTopic(String topic) {
return NOT_ALLOWED_SEND_TOPIC_SET.contains(topic);
}
public static boolean isNotAllowedSendTopic(String topic, RemotingCommand response) {
if (isNotAllowedSendTopic(topic)) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("Sending message to topic[" + topic + "] is forbidden.");
return true;
}
return false;
}
public static void addSystemTopic(String systemTopic) {
SYSTEM_TOPIC_SET.add(systemTopic);
}
public static Set<String> getSystemTopicSet() {
return SYSTEM_TOPIC_SET;
}
public static Set<String> getNotAllowedSendTopicSet() {
return NOT_ALLOWED_SEND_TOPIC_SET;
}
}
......@@ -13,11 +13,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
*/
package org.apache.rocketmq.common.topic;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Test;
......@@ -40,18 +40,11 @@ public class TopicValidatorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).contains("The specified topic contains illegal characters");
clearResponse(response);
res = TopicValidator.validateTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, response);
assertThat(res).isFalse();
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).contains("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.");
clearResponse(response);
res = TopicValidator.validateTopic(generateString(128), response);
assertThat(res).isFalse();
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).contains("The specified topic is longer than topic max length.");
}
@Test
......@@ -64,6 +57,76 @@ public class TopicValidatorTest {
assertThat(response.getRemark()).isEmpty();
}
@Test
public void testAddSystemTopic() {
String topic = "SYSTEM_TOPIC_TEST";
TopicValidator.addSystemTopic(topic);
assertThat(TopicValidator.getSystemTopicSet()).contains(topic);
}
@Test
public void testIsSystemTopic() {
boolean res;
for (String topic : TopicValidator.getSystemTopicSet()) {
res = TopicValidator.isSystemTopic(topic);
assertThat(res).isTrue();
}
String topic = TopicValidator.SYSTEM_TOPIC_PREFIX + "_test";
res = TopicValidator.isSystemTopic(topic);
assertThat(res).isTrue();
topic = "test_not_system_topic";
res = TopicValidator.isSystemTopic(topic);
assertThat(res).isFalse();
}
@Test
public void testIsSystemTopicWithResponse() {
RemotingCommand response = RemotingCommand.createResponseCommand(-1, "");
boolean res;
for (String topic : TopicValidator.getSystemTopicSet()) {
res = TopicValidator.isSystemTopic(topic, response);
assertThat(res).isTrue();
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic.");
}
String topic = "test_not_system_topic";
res = TopicValidator.isSystemTopic(topic, response);
assertThat(res).isFalse();
}
@Test
public void testIsNotAllowedSendTopic() {
boolean res;
for (String topic : TopicValidator.getNotAllowedSendTopicSet()) {
res = TopicValidator.isNotAllowedSendTopic(topic);
assertThat(res).isTrue();
}
String topic = "test_allowed_send_topic";
res = TopicValidator.isNotAllowedSendTopic(topic);
assertThat(res).isFalse();
}
@Test
public void testIsNotAllowedSendTopicWithResponse() {
RemotingCommand response = RemotingCommand.createResponseCommand(-1, "");
boolean res;
for (String topic : TopicValidator.getNotAllowedSendTopicSet()) {
res = TopicValidator.isNotAllowedSendTopic(topic, response);
assertThat(res).isTrue();
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
assertThat(response.getRemark()).isEqualTo("Sending message to topic[" + topic + "] is forbidden.");
}
String topic = "test_allowed_send_topic";
res = TopicValidator.isNotAllowedSendTopic(topic, response);
assertThat(res).isFalse();
}
private static void clearResponse(RemotingCommand response) {
response.setCode(-1);
response.setRemark("");
......
......@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
......@@ -340,7 +341,7 @@ public class CommitLog {
// Timing message processing
{
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
......@@ -577,7 +578,7 @@ public class CommitLog {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
......@@ -799,7 +800,7 @@ public class CommitLog {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
......
......@@ -50,6 +50,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
......@@ -1019,7 +1020,7 @@ public class DefaultMessageStore implements MessageStore {
Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
String topic = next.getKey();
if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
for (ConsumeQueue cq : queueTable.values()) {
cq.destroy();
......@@ -1050,7 +1051,7 @@ public class DefaultMessageStore implements MessageStore {
while (it.hasNext()) {
Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
String topic = next.getKey();
if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator();
while (itQT.hasNext()) {
......
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.CommitLog;
......@@ -383,7 +384,7 @@ public class DLedgerCommitLog extends CommitLog {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
......
......@@ -25,9 +25,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -48,7 +48,6 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
private static final long FIRST_DELAY_TIME = 1000L;
private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L;
......@@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager {
Map.Entry<Integer, Long> next = it.next();
int queueId = delayLevel2QueueId(next.getKey());
long delayOffset = next.getValue();
long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId);
long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, queueId);
String value = String.format("%d,%d", delayOffset, maxOffset);
String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey());
stats.put(key, value);
......@@ -262,7 +261,7 @@ public class ScheduleMessageService extends ConfigManager {
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
......@@ -306,7 +305,7 @@ public class ScheduleMessageService extends ConfigManager {
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (MixAll.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
......
......@@ -26,7 +26,6 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
......@@ -51,6 +50,7 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -62,7 +62,7 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack;
public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private String adminExtGroup = "admin_ext_group";
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
private long timeoutMillis = 5000;
public DefaultMQAdminExt() {
......
......@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
......@@ -83,7 +84,7 @@ public class MonitorService {
try {
this.defaultMQPushConsumer.setConsumeThreadMin(1);
this.defaultMQPushConsumer.setConsumeThreadMax(1);
this.defaultMQPushConsumer.subscribe(MixAll.OFFSET_MOVED_EVENT, "*");
this.defaultMQPushConsumer.subscribe(TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT, "*");
this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册