diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 55b939201561f3645ebb5fff332b397c5d027ed4..ced7c2014f4a5d24b5832971142e686ebc404df0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -25,7 +25,7 @@ import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; -import org.apache.rocketmq.broker.topic.TopicValidator; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; @@ -176,6 +176,9 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) { return response; } + if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) { + return response; + } TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index eb811832058e8343df8aafcbc0e78a96a3106618..dcdb701635884c243121f39dc09de67fbe1a226d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -19,17 +19,6 @@ package org.apache.rocketmq.broker.processor; import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.io.UnsupportedEncodingException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.broker.BrokerController; @@ -37,8 +26,8 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; -import org.apache.rocketmq.broker.topic.TopicValidator; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -137,6 +126,18 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; +import java.io.UnsupportedEncodingException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -252,29 +253,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { - String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; - log.warn(errorMsg); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(errorMsg); - return response; - } + String topic = requestHeader.getTopic(); - if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) { + if (!TopicValidator.validateTopic(topic, response)) { return response; } - - try { - response.setCode(ResponseCode.SUCCESS); - response.setOpaque(request.getOpaque()); - response.markResponseType(); - response.setRemark(null); - ctx.writeAndFlush(response); - } catch (Exception e) { - log.error("Failed to produce a proper response", e); + if (TopicValidator.isSystemTopic(topic, response)) { + return response; } - TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic()); + TopicConfig topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); @@ -285,7 +273,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); - return null; + response.setCode(ResponseCode.SUCCESS); + return response; } private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, @@ -296,7 +285,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); + String topic = requestHeader.getTopic(); + if (!TopicValidator.validateTopic(topic, response)) { + return response; + } + if (TopicValidator.isSystemTopic(topic, response)) { + return response; + } + + this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); this.brokerController.getMessageStore() .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) { @@ -430,7 +427,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements responseHeader.setBrokerAddr(this.brokerController.getBrokerAddr()); responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); responseHeader.setClusterName(this.brokerController.getBrokerConfig().getBrokerClusterName()); - + response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; @@ -1118,7 +1115,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - Set topics = this.brokerController.getTopicConfigManager().getSystemTopic(); + Set topics = TopicValidator.getSystemTopicSet(); TopicList topicList = new TopicList(); topicList.setTopicList(topics); response.setBody(topicList.encode()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index b02b5a058838a5eb6305d5f65cd856213536a805..957dbbaaa32972c6b0e7af035737ee3dc314aeb3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -523,7 +524,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements private void generateOffsetMovedEvent(final OffsetMovedEvent event) { try { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT); + msgInner.setTopic(TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT); msgInner.setTags(event.getConsumerGroup()); msgInner.setDelayTimeLevel(0); msgInner.setKeys(event.getConsumerGroup()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 199b46d603a72ffb4561d21a6e146df291228390..86f606593f3db18fce0c3f470c807e22f3ba1516 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.topic; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -37,18 +36,20 @@ import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; public class TopicConfigManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; + private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18; + private transient final Lock lockTopicConfigTable = new ReentrantLock(); private final ConcurrentMap topicConfigTable = new ConcurrentHashMap(1024); private final DataVersion dataVersion = new DataVersion(); - private final Set systemTopicList = new HashSet(); private transient BrokerController brokerController; public TopicConfigManager() { @@ -57,20 +58,18 @@ public class TopicConfigManager extends ConfigManager { public TopicConfigManager(BrokerController brokerController) { this.brokerController = brokerController; { - // MixAll.SELF_TEST_TOPIC - String topic = MixAll.SELF_TEST_TOPIC; + String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { - // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { - String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; + String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() @@ -81,10 +80,9 @@ public class TopicConfigManager extends ConfigManager { } } { - // MixAll.BENCHMARK_TOPIC - String topic = MixAll.BENCHMARK_TOPIC; + String topic = TopicValidator.RMQ_SYS_BENCHMARK_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1024); topicConfig.setWriteQueueNums(1024); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); @@ -93,7 +91,7 @@ public class TopicConfigManager extends ConfigManager { String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); int perm = PermName.PERM_INHERIT; if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; @@ -105,7 +103,7 @@ public class TopicConfigManager extends ConfigManager { String topic = this.brokerController.getBrokerConfig().getBrokerName(); TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); int perm = PermName.PERM_INHERIT; if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; @@ -116,19 +114,26 @@ public class TopicConfigManager extends ConfigManager { this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { - // MixAll.OFFSET_MOVED_EVENT - String topic = MixAll.OFFSET_MOVED_EVENT; + String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } + { + String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + TopicValidator.addSystemTopic(topic); + topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); + topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } { if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); @@ -137,21 +142,13 @@ public class TopicConfigManager extends ConfigManager { { String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } } - public boolean isSystemTopic(final String topic) { - return this.systemTopicList.contains(topic); - } - - public Set getSystemTopic() { - return this.systemTopicList; - } - public TopicConfig selectTopicConfig(final String topic) { return this.topicConfigTable.get(topic); } @@ -170,7 +167,7 @@ public class TopicConfigManager extends ConfigManager { TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); if (defaultTopicConfig != null) { - if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) { if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); } @@ -275,7 +272,7 @@ public class TopicConfigManager extends ConfigManager { } public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) { - TopicConfig topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + TopicConfig topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) return topicConfig; @@ -284,18 +281,18 @@ public class TopicConfigManager extends ConfigManager { try { if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) return topicConfig; - topicConfig = new TopicConfig(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + topicConfig = new TopicConfig(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); topicConfig.setReadQueueNums(clientDefaultTopicQueueNums); topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums); topicConfig.setPerm(perm); topicConfig.setTopicSysFlag(0); log.info("create new topic {}", topicConfig); - this.topicConfigTable.put(MixAll.TRANS_CHECK_MAX_TIME_TOPIC, topicConfig); + this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, topicConfig); createNew = true; this.dataVersion.nextVersion(); this.persist(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java deleted file mode 100644 index 58b1cc86fcdfcdbf38d3c3f9656008fae052b8c6..0000000000000000000000000000000000000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.broker.topic; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; - -public class TopicValidator { - - private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; - private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); - private static final int TOPIC_MAX_LENGTH = 127; - - private static boolean regularExpressionMatcher(String origin, Pattern pattern) { - if (pattern == null) { - return true; - } - Matcher matcher = pattern.matcher(origin); - return matcher.matches(); - } - - public static boolean validateTopic(String topic, RemotingCommand response) { - - if (UtilAll.isBlank(topic)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is blank."); - return false; - } - - if (!regularExpressionMatcher(topic, PATTERN)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR); - return false; - } - - if (topic.length() > TOPIC_MAX_LENGTH) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is longer than topic max length."); - return false; - } - - //whether the same with system reserved keyword - if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC."); - return false; - } - - return true; - } -} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 25065ebe885f5644aa128f96e09a6583ad0ede65..1f5e01efa43518a4d6806d435432576ac9dbbe36 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -21,13 +21,13 @@ import org.apache.rocketmq.broker.transaction.OperationResult; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -127,7 +127,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { try { - String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC; + String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; Set msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageUtil.java index 3042b4c3e568479407178072bae798a8751a76cd..e6baf0266ccf431a249514b91565092f844541c9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageUtil.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageUtil.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.transaction.queue; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; import java.nio.charset.Charset; @@ -25,11 +26,11 @@ public class TransactionalMessageUtil { public static Charset charset = Charset.forName("utf-8"); public static String buildOpTopic() { - return MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC; + return TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC; } public static String buildHalfTopic() { - return MixAll.RMQ_SYS_TRANS_HALF_TOPIC; + return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; } public static String buildConsumerGroup() { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index ec0a879c209c97d7c8ab2f63f254e343b0b62fba..e31862cd63e4cd30ffd578646de8a1a72dd44afe 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -16,17 +16,22 @@ */ package org.apache.rocketmq.broker.processor; +import com.google.common.collect.Sets; import io.netty.channel.ChannelHandlerContext; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; @@ -47,6 +52,10 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.Set; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; @@ -61,17 +70,31 @@ public class AdminBrokerProcessorTest { @Spy private BrokerController - brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), - new MessageStoreConfig()); + brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), + new MessageStoreConfig()); @Mock private MessageStore messageStore; + private Set systemTopicSet; @Before public void init() { brokerController.setMessageStore(messageStore); adminBrokerProcessor = new AdminBrokerProcessor(brokerController); + + systemTopicSet = Sets.newHashSet( + TopicValidator.RMQ_SYS_SELF_TEST_TOPIC, + TopicValidator.RMQ_SYS_BENCHMARK_TOPIC, + TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, + TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT, + TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, + this.brokerController.getBrokerConfig().getBrokerClusterName(), + this.brokerController.getBrokerConfig().getBrokerName(), + this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX); + if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { + systemTopicSet.add(this.brokerController.getBrokerConfig().getMsgTraceTopicName()); + } } @Test @@ -94,6 +117,67 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); } + @Test + public void testUpdateAndCreateTopic() throws Exception { + //test system topic + for (String topic : systemTopicSet) { + RemotingCommand request = buildCreateTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic."); + } + + //test validate error topic + String topic = ""; + RemotingCommand request = buildCreateTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + topic = "TEST_CREATE_TOPIC"; + request = buildCreateTopicRequest(topic); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + } + + @Test + public void testDeleteTopic() throws Exception { + //test system topic + for (String topic : systemTopicSet) { + RemotingCommand request = buildDeleteTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic."); + } + + String topic = "TEST_DELETE_TOPIC"; + RemotingCommand request = buildDeleteTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + private RemotingCommand buildCreateTopicRequest(String topic) { + CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name()); + requestHeader.setReadQueueNums(8); + requestHeader.setWriteQueueNums(8); + requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); + request.makeCustomHeaderToNet(); + return request; + } + + private RemotingCommand buildDeleteTopicRequest(String topic) { + DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); + requestHeader.setTopic(topic); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader); + request.makeCustomHeaderToNet(); + return request; + } + private MessageExt createDefaultMessageExt() { MessageExt messageExt = new MessageExt(); messageExt.setMsgId("12345678"); @@ -106,10 +190,11 @@ public class AdminBrokerProcessorTest { return messageExt; } - private SelectMappedBufferResult createSelectMappedBufferResult(){ - SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024) ,0, new MappedFile()); + private SelectMappedBufferResult createSelectMappedBufferResult() { + SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024), 0, new MappedFile()); return result; } + private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() { ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader(); header.setMsgId("C0A803CA00002A9F0000000000031367"); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java index 85c775040f53ef6093e979c8c6adc53a3a36161f..eeca6e7622c5d5e37bc7be493adcc9df668e5eab 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java @@ -25,15 +25,14 @@ import java.util.Map; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.net.Broker2Client; -import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -115,7 +114,7 @@ public class ReplyMessageProcessorTest { SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(group); requestHeader.setTopic(topic); - requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC); + requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC); requestHeader.setDefaultTopicQueueNums(3); requestHeader.setQueueId(1); requestHeader.setSysFlag(0); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index bdf13d4a7faa7334861df540d9e5910996ad4b0a..b9344e90ed600f29ac72c2720b60fbbff1cda5d4 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -23,7 +23,6 @@ import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -32,6 +31,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; @@ -238,7 +238,7 @@ public class SendMessageProcessorTest { SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(group); requestHeader.setTopic(topic); - requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC); + requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC); requestHeader.setDefaultTopicQueueNums(3); requestHeader.setQueueId(1); requestHeader.setSysFlag(0); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java index 653a96933ed5decb6b19a0086008c96d90ad7c77..f6035463c01a4588f69f7094d02a331416284b5a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java @@ -19,10 +19,10 @@ package org.apache.rocketmq.broker.transaction.queue; import java.net.InetSocketAddress; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -92,7 +92,7 @@ public class DefaultTransactionalMessageCheckListenerTest { @Test public void testResolveDiscardMsg() { MessageExt messageExt = new MessageExt(); - messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); + messageExt.setTopic(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC); messageExt.setQueueId(0); messageExt.setBody("test resolve discard msg".getBytes()); messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911)); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java index 5d8c2b955d6cadac46eb8302a7d79bcaeb6951fa..031a55a5d9d74af1151896834d8f5d8656084894 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java @@ -20,11 +20,11 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.AppendMessageResult; @@ -98,7 +98,7 @@ public class TransactionalMessageBridgeTest { @Test public void testFetchMessageQueues() { - Set messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); + Set messageQueues = transactionBridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC); assertThat(messageQueues.size()).isEqualTo(1); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java index 47eccbe026ee5f1e9a04092cc90d1af7f2e512b8..8b138fc896a5e929fbfa8f5e1f9b91d9cc139a98 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java @@ -23,13 +23,13 @@ import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.AppendMessageResult; @@ -108,10 +108,10 @@ public class TransactionalMessageServiceImplTest { @Test public void testCheck_withDiscard() { - when(bridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)); - when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createDiscardPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hellp", 1)); - when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0)); - when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createOpPulResult(MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "10", 1)); + when(bridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)); + when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createDiscardPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hellp", 1)); + when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0)); + when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createOpPulResult(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "10", 1)); long timeOut = this.brokerController.getBrokerConfig().getTransactionTimeOut(); int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax(); final AtomicInteger checkMessage = new AtomicInteger(0); @@ -128,10 +128,10 @@ public class TransactionalMessageServiceImplTest { @Test public void testCheck_withCheck() { - when(bridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)); - when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1)); - when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0)); - when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0)); + when(bridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)); + when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1)); + when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0)); + when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0)); when(bridge.getBrokerController()).thenReturn(this.brokerController); when(bridge.renewHalfMessageInner(any(MessageExtBrokerInner.class))).thenReturn(createMessageBrokerInner()); when(bridge.putMessageReturnResult(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index d77faf3c222dbcc828c3182ceb4301ba25072d10..e712e2f5e46a2c34e9f21eea73790e0ec8d5d126 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -21,10 +21,10 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.topic.TopicValidator; /** * Common Validator @@ -85,6 +85,7 @@ public class Validators { } // topic Validators.checkTopic(msg.getTopic()); + Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { @@ -116,11 +117,19 @@ public class Validators { throw new MQClientException( String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null); } + } + + public static void isSystemTopic(String topic) throws MQClientException { + if (TopicValidator.isSystemTopic(topic)) { + throw new MQClientException( + String.format("The topic[%s] is conflict with system topic.", topic), null); + } + } - //whether the same with system reserved keyword - if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + public static void isNotAllowedSendTopic(String topic) throws MQClientException { + if (TopicValidator.isNotAllowedSendTopic(topic)) { throw new MQClientException( - String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null); + String.format("Sending message to topic[%s] is forbidden.", topic), null); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 9dbd55201dc6bea8ff1046e59be1fbd674a2b63b..2128ffd0dd3fd44bddee8699671007d6ba9450fc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -82,6 +82,7 @@ public class MQAdminImpl { public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { try { Validators.checkTopic(newTopic); + Validators.isSystemTopic(newTopic); TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis); List brokerDataList = topicRouteData.getBrokerDatas(); if (brokerDataList != null && !brokerDataList.isEmpty()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 073e3679be0462ed657a942c1a63ba0698f04931..c64d7c568ec4c250fdee1c6c5749f65ca9d3d3fb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1364,7 +1364,7 @@ public class MQClientAPIImpl { assert response != null; switch (response.getCode()) { case ResponseCode.TOPIC_NOT_EXIST: { - if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + if (allowTopicNotExist) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 886203fe7d405ec9dede6ecbf6165cedaed72d13..e937ce39bb8f0a60e68d413609df155da36dc195 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -671,7 +671,7 @@ public class MQClientInstance { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (MQClientException e) { - if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } catch (RemotingException e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index bff1457835fba85421573039410b2c79aeae6d8e..43dffd38e35703af9622eddfc89dc19e5c3746f9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -421,6 +421,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.makeSureStateOK(); Validators.checkTopic(newTopic); + Validators.isSystemTopic(newTopic); this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 045734103f571e798508e4dfab3eaac738586c5e..09969504b20684c2bcf9d4a5610f82f87c7b3859 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -74,7 +75,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Just for testing or demo program */ - private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; + private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; /** * Number of queues to create per default topic. diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 8c3d886517595ce2fb53d1012735d8fb7af32b50..1af6e60aab936187a4aafb25cf720a65eebc154a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -41,11 +41,11 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -90,7 +90,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; } else { - this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; + this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC; } this.traceExecutor = new ThreadPoolExecutor(// 10, // diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index cb4a246517d62e711bd7b7dd171fb4e97d575482..27622cd30223602143d4d6948e906a6da59daf81 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.client.trace; -import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; public class TraceConstants { @@ -24,5 +24,5 @@ public class TraceConstants { public static final char CONTENT_SPLITOR = (char) 1; public static final char FIELD_SPLITOR = (char) 2; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; - public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_"; + public static final String TRACE_TOPIC_PREFIX = TopicValidator.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_"; } diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java index e2b9abd46f045bdefbc4f78bdc8112ac899b4563..343fe4bca69707da07858e84daea2d0927cd6722 100644 --- a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java @@ -19,11 +19,12 @@ package org.apache.rocketmq.client; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.junit.Assert.fail; public class ValidatorsTest { @@ -47,17 +48,6 @@ public class ValidatorsTest { } } - @Test - public void testCheckTopic_UseDefaultTopic() { - String defaultTopic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; - try { - Validators.checkTopic(defaultTopic); - failBecauseExceptionWasNotThrown(MQClientException.class); - } catch (MQClientException e) { - assertThat(e).hasMessageStartingWith(String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", defaultTopic)); - } - } - @Test public void testCheckTopic_BlankTopic() { String blankTopic = ""; @@ -80,4 +70,30 @@ public class ValidatorsTest { assertThat(e).hasMessageStartingWith("The specified topic is longer than topic max length"); } } + + @Test + public void testIsSystemTopic() { + for (String topic : TopicValidator.getSystemTopicSet()) { + try { + Validators.isSystemTopic(topic); + fail("excepted MQClientException for system topic"); + } catch (MQClientException e) { + assertThat(e.getResponseCode()).isEqualTo(-1); + assertThat(e.getErrorMessage()).isEqualTo(String.format("The topic[%s] is conflict with system topic.", topic)); + } + } + } + + @Test + public void testIsNotAllowedSendTopic() { + for (String topic : TopicValidator.getNotAllowedSendTopicSet()) { + try { + Validators.isNotAllowedSendTopic(topic); + fail("excepted MQClientException for blacklist topic"); + } catch (MQClientException e) { + assertThat(e.getResponseCode()).isEqualTo(-1); + assertThat(e.getErrorMessage()).isEqualTo(String.format("Sending message to topic[%s] is forbidden.", topic)); + } + } + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 496c5143e02387f5a39289f8a77ce61021fbd927..6c1380b12e8b06617d706d21f6a3aef8ca5ea95f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; @@ -90,7 +91,7 @@ import static org.mockito.Mockito.when; public class DefaultMQConsumerWithTraceTest { private String consumerGroup; private String consumerGroupNormal; - private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); private String topic = "FooBar"; private String brokerName = "BrokerA"; diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 3759acba139f9d7eb3d343855acbc27e338512b7..da573a25fa27592e4f2578555eafca6c2fb85924 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -31,12 +31,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; @@ -75,7 +75,7 @@ public class DefaultMQProducerWithTraceTest { private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); - private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); private String customerTraceTopic = "rmq_trace_topic_12345"; @Before diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index bfe8a2104db0b5a1781f066791f5b61f9fa19d12..d80b3d2169b13e9c8a5a439b02bd726763a20283 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; @@ -52,7 +53,7 @@ public class BrokerConfig { private boolean autoCreateSubscriptionGroup = true; private String messageStorePlugIn = ""; @ImportantField - private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; + private String msgTraceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC; @ImportantField private boolean traceTopicEnable = false; /** diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index de259c931b9c4fbbc0ff97c3fb3cf2ad766ee064..9d95ecb5ea4cfaa7d445318f079afcbb658654a9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -55,8 +55,6 @@ public class MixAll { public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); //http://jmenv.tbsite.net:8080/rocketmq/nsaddr //public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; - public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable - public static final String BENCHMARK_TOPIC = "BenchmarkTest"; public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER"; @@ -65,8 +63,6 @@ public class MixAll { public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"; public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP"; public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP"; - public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; - public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"; public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY"; public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION"; public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER"; @@ -80,14 +76,9 @@ public class MixAll { public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC"; - public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY"; public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion"; public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; - public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; - public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC"; - public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; - public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC"; public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; public static final String REPLY_MESSAGE_FLAG = "reply"; @@ -115,10 +106,6 @@ public class MixAll { return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); } - public static boolean isSystemTopic(final String topic) { - return topic.startsWith(SYSTEM_TOPIC_PREFIX); - } - public static String getDLQTopic(final String consumerGroup) { return DLQ_GROUP_TOPIC_PREFIX + consumerGroup; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java index afd537612267ca04068e3338d8e67ea98fb3835b..4827844dee7b6c0d0ac29c5c1898d49b8bdefde1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.common.protocol; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; public class NamespaceUtil { public static final char NAMESPACE_SEPARATOR = '%'; @@ -155,11 +156,11 @@ public class NamespaceUtil { return false; } - if (MixAll.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) { + if (TopicValidator.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) { return true; } - return MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC.equals(resource); + return false; } public static boolean isRetryTopic(String resource) { diff --git a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java new file mode 100644 index 0000000000000000000000000000000000000000..7b0a8394a11aab76f2e33c520a8fb41f68f8cf78 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.topic; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TopicValidator { + + public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable + public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; + public static final String RMQ_SYS_BENCHMARK_TOPIC = "BenchmarkTest"; + public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; + public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC"; + public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; + public static final String RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC"; + public static final String RMQ_SYS_SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; + public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"; + + public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; + + private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; + private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); + private static final int TOPIC_MAX_LENGTH = 127; + + private static final Set SYSTEM_TOPIC_SET = new HashSet(); + + /** + * Topics'set which client can not send msg! + */ + private static final Set NOT_ALLOWED_SEND_TOPIC_SET = new HashSet(); + + static { + SYSTEM_TOPIC_SET.add(AUTO_CREATE_TOPIC_KEY_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_BENCHMARK_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_HALF_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_TRACE_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_OP_HALF_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT); + + NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC); + } + + private static boolean regularExpressionMatcher(String origin, Pattern pattern) { + if (pattern == null) { + return true; + } + Matcher matcher = pattern.matcher(origin); + return matcher.matches(); + } + + public static boolean validateTopic(String topic, RemotingCommand response) { + + if (UtilAll.isBlank(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic is blank."); + return false; + } + + if (!regularExpressionMatcher(topic, PATTERN)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR); + return false; + } + + if (topic.length() > TOPIC_MAX_LENGTH) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic is longer than topic max length."); + return false; + } + + return true; + } + + public static boolean isSystemTopic(String topic, RemotingCommand response) { + if (isSystemTopic(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The topic[" + topic + "] is conflict with system topic."); + return true; + } + return false; + } + + public static boolean isSystemTopic(String topic) { + return SYSTEM_TOPIC_SET.contains(topic) || topic.startsWith(SYSTEM_TOPIC_PREFIX); + } + + public static boolean isNotAllowedSendTopic(String topic) { + return NOT_ALLOWED_SEND_TOPIC_SET.contains(topic); + } + + public static boolean isNotAllowedSendTopic(String topic, RemotingCommand response) { + if (isNotAllowedSendTopic(topic)) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("Sending message to topic[" + topic + "] is forbidden."); + return true; + } + return false; + } + + public static void addSystemTopic(String systemTopic) { + SYSTEM_TOPIC_SET.add(systemTopic); + } + + public static Set getSystemTopicSet() { + return SYSTEM_TOPIC_SET; + } + + public static Set getNotAllowedSendTopicSet() { + return NOT_ALLOWED_SEND_TOPIC_SET; + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java b/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java similarity index 52% rename from broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java rename to common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java index 78be63fc56aaf57c870c44cb095272604e651a78..bb49417b00cc3acc1bf4974a8bf5b6a9c0e768b3 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java @@ -13,11 +13,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ -package org.apache.rocketmq.broker.topic; + */ +package org.apache.rocketmq.common.topic; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.Test; @@ -40,18 +40,11 @@ public class TopicValidatorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); assertThat(response.getRemark()).contains("The specified topic contains illegal characters"); - clearResponse(response); - res = TopicValidator.validateTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, response); - assertThat(res).isFalse(); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).contains("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC."); - clearResponse(response); res = TopicValidator.validateTopic(generateString(128), response); assertThat(res).isFalse(); assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); assertThat(response.getRemark()).contains("The specified topic is longer than topic max length."); - } @Test @@ -64,6 +57,76 @@ public class TopicValidatorTest { assertThat(response.getRemark()).isEmpty(); } + @Test + public void testAddSystemTopic() { + String topic = "SYSTEM_TOPIC_TEST"; + TopicValidator.addSystemTopic(topic); + assertThat(TopicValidator.getSystemTopicSet()).contains(topic); + } + + @Test + public void testIsSystemTopic() { + boolean res; + for (String topic : TopicValidator.getSystemTopicSet()) { + res = TopicValidator.isSystemTopic(topic); + assertThat(res).isTrue(); + } + + String topic = TopicValidator.SYSTEM_TOPIC_PREFIX + "_test"; + res = TopicValidator.isSystemTopic(topic); + assertThat(res).isTrue(); + + topic = "test_not_system_topic"; + res = TopicValidator.isSystemTopic(topic); + assertThat(res).isFalse(); + } + + @Test + public void testIsSystemTopicWithResponse() { + RemotingCommand response = RemotingCommand.createResponseCommand(-1, ""); + boolean res; + for (String topic : TopicValidator.getSystemTopicSet()) { + res = TopicValidator.isSystemTopic(topic, response); + assertThat(res).isTrue(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic."); + } + + String topic = "test_not_system_topic"; + res = TopicValidator.isSystemTopic(topic, response); + assertThat(res).isFalse(); + } + + @Test + public void testIsNotAllowedSendTopic() { + boolean res; + for (String topic : TopicValidator.getNotAllowedSendTopicSet()) { + res = TopicValidator.isNotAllowedSendTopic(topic); + assertThat(res).isTrue(); + } + + String topic = "test_allowed_send_topic"; + res = TopicValidator.isNotAllowedSendTopic(topic); + assertThat(res).isFalse(); + } + + @Test + public void testIsNotAllowedSendTopicWithResponse() { + RemotingCommand response = RemotingCommand.createResponseCommand(-1, ""); + + boolean res; + for (String topic : TopicValidator.getNotAllowedSendTopicSet()) { + res = TopicValidator.isNotAllowedSendTopic(topic, response); + assertThat(res).isTrue(); + assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION); + assertThat(response.getRemark()).isEqualTo("Sending message to topic[" + topic + "] is forbidden."); + } + + String topic = "test_allowed_send_topic"; + res = TopicValidator.isNotAllowedSendTopic(topic, response); + assertThat(res).isFalse(); + } + private static void clearResponse(RemotingCommand response) { response.setCode(-1); response.setRemark(""); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 810110eeb67237342e6005e46fdc29b8e3f6c65f..b6d17daa4e54e2066290979e7b03760f64bb6738 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; @@ -340,7 +341,7 @@ public class CommitLog { // Timing message processing { String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); - if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) { + if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) { int delayLevel = Integer.parseInt(t); if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { @@ -577,7 +578,7 @@ public class CommitLog { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } - topic = ScheduleMessageService.SCHEDULE_TOPIC; + topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId @@ -799,7 +800,7 @@ public class CommitLog { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } - topic = ScheduleMessageService.SCHEDULE_TOPIC; + topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 900e9af31fa5d51c8342be4ee8cffa23df49e822..87af9323df2b39393a0ed6d0653888dd7619ecfd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; @@ -1019,7 +1020,7 @@ public class DefaultMessageStore implements MessageStore { Entry> next = it.next(); String topic = next.getKey(); - if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { + if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) { ConcurrentMap queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); @@ -1050,7 +1051,7 @@ public class DefaultMessageStore implements MessageStore { while (it.hasNext()) { Entry> next = it.next(); String topic = next.getKey(); - if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { + if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) { ConcurrentMap queueTable = next.getValue(); Iterator> itQT = queueTable.entrySet().iterator(); while (itQT.hasNext()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 3d748bf8900b5c92ae35cc8b9513109519ce8a86..3361b63f826670193854752cda088742d049ef51 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.CommitLog; @@ -383,7 +384,7 @@ public class DLedgerCommitLog extends CommitLog { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } - topic = ScheduleMessageService.SCHEDULE_TOPIC; + topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 3be8cbc809ed59f6d39de94cd407a01741a899be..3b19a16fd4b7fc238b9e5697a32c2672fd270e19 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -25,9 +25,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; @@ -48,7 +48,6 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; public class ScheduleMessageService extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; private static final long FIRST_DELAY_TIME = 1000L; private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; @@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager { Map.Entry next = it.next(); int queueId = delayLevel2QueueId(next.getKey()); long delayOffset = next.getValue(); - long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId); + long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, queueId); String value = String.format("%d,%d", delayOffset, maxOffset); String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey()); stats.put(key, value); @@ -262,7 +261,7 @@ public class ScheduleMessageService extends ConfigManager { public void executeOnTimeup() { ConsumeQueue cq = - ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, + ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; @@ -306,7 +305,7 @@ public class ScheduleMessageService extends ConfigManager { if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); - if (MixAll.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { + if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 1ca3fe4c2f482a286b485021544d74dca16d39bf..e80a813eca2dc9c3f965887154ba896c1272100e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -26,7 +26,6 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; @@ -51,6 +50,7 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -62,7 +62,7 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack; public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { private final DefaultMQAdminExtImpl defaultMQAdminExtImpl; private String adminExtGroup = "admin_ext_group"; - private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; + private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; private long timeoutMillis = 5000; public DefaultMQAdminExt() { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java index 9bf09ad410754be36bb2cb75e01761d83a02288c..94f588da27537575e8e99fc73b5e707425edc417 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -83,7 +84,7 @@ public class MonitorService { try { this.defaultMQPushConsumer.setConsumeThreadMin(1); this.defaultMQPushConsumer.setConsumeThreadMax(1); - this.defaultMQPushConsumer.subscribe(MixAll.OFFSET_MOVED_EVENT, "*"); + this.defaultMQPushConsumer.subscribe(TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT, "*"); this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override