From b6307dfce1c201877f7a27691cad74b842c0d971 Mon Sep 17 00:00:00 2001 From: Jun <62043001+coder-zzzz@users.noreply.github.com> Date: Fri, 5 Jun 2020 18:28:56 +0800 Subject: [PATCH] [ISSUE #1976]System topic should add permission checking globally (#1985) * fix #1976 System Topic like SCHEDULE_TOPIC_XXXX should not be create or delete by user * User can not send message to system topic SCHEDULE_TOPIC_XXXX * the-->The * fix magic number 18 * move system topic to TopicValidator except TBW102 * move TBW102 and isSystemTopic to TopicValidator * add test code * validateTopic --> validatorSystemTopic, validatorBlacklistTopic * validateTopic --> validatorSystemTopic, validatorBlacklistTopic * rename some methods --- .../AbstractSendMessageProcessor.java | 5 +- .../processor/AdminBrokerProcessor.java | 65 ++++----- .../processor/PullMessageProcessor.java | 3 +- .../broker/topic/TopicConfigManager.java | 59 ++++---- .../rocketmq/broker/topic/TopicValidator.java | 69 --------- .../TransactionalMessageServiceImpl.java | 4 +- .../queue/TransactionalMessageUtil.java | 5 +- .../processor/AdminBrokerProcessorTest.java | 97 ++++++++++++- .../processor/ReplyMessageProcessorTest.java | 5 +- .../processor/SendMessageProcessorTest.java | 4 +- ...TransactionalMessageCheckListenerTest.java | 4 +- .../queue/TransactionalMessageBridgeTest.java | 4 +- .../TransactionalMessageServiceImplTest.java | 18 +-- .../apache/rocketmq/client/Validators.java | 17 ++- .../rocketmq/client/impl/MQAdminImpl.java | 1 + .../rocketmq/client/impl/MQClientAPIImpl.java | 2 +- .../client/impl/factory/MQClientInstance.java | 2 +- .../impl/producer/DefaultMQProducerImpl.java | 1 + .../client/producer/DefaultMQProducer.java | 3 +- .../client/trace/AsyncTraceDispatcher.java | 4 +- .../rocketmq/client/trace/TraceConstants.java | 4 +- .../rocketmq/client/ValidatorsTest.java | 40 ++++-- .../trace/DefaultMQConsumerWithTraceTest.java | 3 +- .../trace/DefaultMQProducerWithTraceTest.java | 4 +- .../apache/rocketmq/common/BrokerConfig.java | 3 +- .../org/apache/rocketmq/common/MixAll.java | 13 -- .../common/protocol/NamespaceUtil.java | 5 +- .../rocketmq/common/topic/TopicValidator.java | 135 ++++++++++++++++++ .../common}/topic/TopicValidatorTest.java | 83 +++++++++-- .../org/apache/rocketmq/store/CommitLog.java | 7 +- .../rocketmq/store/DefaultMessageStore.java | 5 +- .../store/dledger/DLedgerCommitLog.java | 3 +- .../schedule/ScheduleMessageService.java | 9 +- .../tools/admin/DefaultMQAdminExt.java | 4 +- .../tools/monitor/MonitorService.java | 3 +- 35 files changed, 463 insertions(+), 230 deletions(-) delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java rename {broker/src/test/java/org/apache/rocketmq/broker => common/src/test/java/org/apache/rocketmq/common}/topic/TopicValidatorTest.java (52%) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 55b93920..ced7c201 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -25,7 +25,7 @@ import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; -import org.apache.rocketmq.broker.topic.TopicValidator; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; @@ -176,6 +176,9 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) { return response; } + if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) { + return response; + } TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index eb811832..dcdb7016 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -19,17 +19,6 @@ package org.apache.rocketmq.broker.processor; import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.io.UnsupportedEncodingException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.broker.BrokerController; @@ -37,8 +26,8 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; -import org.apache.rocketmq.broker.topic.TopicValidator; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -137,6 +126,18 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; +import java.io.UnsupportedEncodingException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -252,29 +253,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { - String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; - log.warn(errorMsg); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(errorMsg); - return response; - } + String topic = requestHeader.getTopic(); - if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) { + if (!TopicValidator.validateTopic(topic, response)) { return response; } - - try { - response.setCode(ResponseCode.SUCCESS); - response.setOpaque(request.getOpaque()); - response.markResponseType(); - response.setRemark(null); - ctx.writeAndFlush(response); - } catch (Exception e) { - log.error("Failed to produce a proper response", e); + if (TopicValidator.isSystemTopic(topic, response)) { + return response; } - TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic()); + TopicConfig topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); @@ -285,7 +273,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); - return null; + response.setCode(ResponseCode.SUCCESS); + return response; } private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, @@ -296,7 +285,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); + String topic = requestHeader.getTopic(); + if (!TopicValidator.validateTopic(topic, response)) { + return response; + } + if (TopicValidator.isSystemTopic(topic, response)) { + return response; + } + + this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); this.brokerController.getMessageStore() .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) { @@ -430,7 +427,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements responseHeader.setBrokerAddr(this.brokerController.getBrokerAddr()); responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); responseHeader.setClusterName(this.brokerController.getBrokerConfig().getBrokerClusterName()); - + response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; @@ -1118,7 +1115,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - Set topics = this.brokerController.getTopicConfigManager().getSystemTopic(); + Set topics = TopicValidator.getSystemTopicSet(); TopicList topicList = new TopicList(); topicList.setTopicList(topics); response.setBody(topicList.encode()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index b02b5a05..957dbbaa 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -523,7 +524,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements private void generateOffsetMovedEvent(final OffsetMovedEvent event) { try { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT); + msgInner.setTopic(TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT); msgInner.setTags(event.getConsumerGroup()); msgInner.setDelayTimeLevel(0); msgInner.setKeys(event.getConsumerGroup()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 199b46d6..86f60659 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.topic; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -37,18 +36,20 @@ import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; public class TopicConfigManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; + private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18; + private transient final Lock lockTopicConfigTable = new ReentrantLock(); private final ConcurrentMap topicConfigTable = new ConcurrentHashMap(1024); private final DataVersion dataVersion = new DataVersion(); - private final Set systemTopicList = new HashSet(); private transient BrokerController brokerController; public TopicConfigManager() { @@ -57,20 +58,18 @@ public class TopicConfigManager extends ConfigManager { public TopicConfigManager(BrokerController brokerController) { this.brokerController = brokerController; { - // MixAll.SELF_TEST_TOPIC - String topic = MixAll.SELF_TEST_TOPIC; + String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { - // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { - String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; + String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() @@ -81,10 +80,9 @@ public class TopicConfigManager extends ConfigManager { } } { - // MixAll.BENCHMARK_TOPIC - String topic = MixAll.BENCHMARK_TOPIC; + String topic = TopicValidator.RMQ_SYS_BENCHMARK_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1024); topicConfig.setWriteQueueNums(1024); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); @@ -93,7 +91,7 @@ public class TopicConfigManager extends ConfigManager { String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); int perm = PermName.PERM_INHERIT; if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; @@ -105,7 +103,7 @@ public class TopicConfigManager extends ConfigManager { String topic = this.brokerController.getBrokerConfig().getBrokerName(); TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); int perm = PermName.PERM_INHERIT; if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; @@ -116,19 +114,26 @@ public class TopicConfigManager extends ConfigManager { this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { - // MixAll.OFFSET_MOVED_EVENT - String topic = MixAll.OFFSET_MOVED_EVENT; + String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } + { + String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + TopicValidator.addSystemTopic(topic); + topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); + topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } { if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); @@ -137,21 +142,13 @@ public class TopicConfigManager extends ConfigManager { { String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX; TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); + TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } } - public boolean isSystemTopic(final String topic) { - return this.systemTopicList.contains(topic); - } - - public Set getSystemTopic() { - return this.systemTopicList; - } - public TopicConfig selectTopicConfig(final String topic) { return this.topicConfigTable.get(topic); } @@ -170,7 +167,7 @@ public class TopicConfigManager extends ConfigManager { TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); if (defaultTopicConfig != null) { - if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) { if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); } @@ -275,7 +272,7 @@ public class TopicConfigManager extends ConfigManager { } public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) { - TopicConfig topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + TopicConfig topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) return topicConfig; @@ -284,18 +281,18 @@ public class TopicConfigManager extends ConfigManager { try { if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) return topicConfig; - topicConfig = new TopicConfig(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + topicConfig = new TopicConfig(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); topicConfig.setReadQueueNums(clientDefaultTopicQueueNums); topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums); topicConfig.setPerm(perm); topicConfig.setTopicSysFlag(0); log.info("create new topic {}", topicConfig); - this.topicConfigTable.put(MixAll.TRANS_CHECK_MAX_TIME_TOPIC, topicConfig); + this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, topicConfig); createNew = true; this.dataVersion.nextVersion(); this.persist(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java deleted file mode 100644 index 58b1cc86..00000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.broker.topic; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; - -public class TopicValidator { - - private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; - private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); - private static final int TOPIC_MAX_LENGTH = 127; - - private static boolean regularExpressionMatcher(String origin, Pattern pattern) { - if (pattern == null) { - return true; - } - Matcher matcher = pattern.matcher(origin); - return matcher.matches(); - } - - public static boolean validateTopic(String topic, RemotingCommand response) { - - if (UtilAll.isBlank(topic)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is blank."); - return false; - } - - if (!regularExpressionMatcher(topic, PATTERN)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR); - return false; - } - - if (topic.length() > TOPIC_MAX_LENGTH) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is longer than topic max length."); - return false; - } - - //whether the same with system reserved keyword - if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC."); - return false; - } - - return true; - } -} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 25065ebe..1f5e01ef 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -21,13 +21,13 @@ import org.apache.rocketmq.broker.transaction.OperationResult; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -127,7 +127,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { try { - String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC; + String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; Set msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageUtil.java index 3042b4c3..e6baf026 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageUtil.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageUtil.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.transaction.queue; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; import java.nio.charset.Charset; @@ -25,11 +26,11 @@ public class TransactionalMessageUtil { public static Charset charset = Charset.forName("utf-8"); public static String buildOpTopic() { - return MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC; + return TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC; } public static String buildHalfTopic() { - return MixAll.RMQ_SYS_TRANS_HALF_TOPIC; + return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; } public static String buildConsumerGroup() { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index ec0a879c..e31862cd 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -16,17 +16,22 @@ */ package org.apache.rocketmq.broker.processor; +import com.google.common.collect.Sets; import io.netty.channel.ChannelHandlerContext; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; @@ -47,6 +52,10 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.Set; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; @@ -61,17 +70,31 @@ public class AdminBrokerProcessorTest { @Spy private BrokerController - brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), - new MessageStoreConfig()); + brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), + new MessageStoreConfig()); @Mock private MessageStore messageStore; + private Set systemTopicSet; @Before public void init() { brokerController.setMessageStore(messageStore); adminBrokerProcessor = new AdminBrokerProcessor(brokerController); + + systemTopicSet = Sets.newHashSet( + TopicValidator.RMQ_SYS_SELF_TEST_TOPIC, + TopicValidator.RMQ_SYS_BENCHMARK_TOPIC, + TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, + TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT, + TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, + this.brokerController.getBrokerConfig().getBrokerClusterName(), + this.brokerController.getBrokerConfig().getBrokerName(), + this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX); + if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { + systemTopicSet.add(this.brokerController.getBrokerConfig().getMsgTraceTopicName()); + } } @Test @@ -94,6 +117,67 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); } + @Test + public void testUpdateAndCreateTopic() throws Exception { + //test system topic + for (String topic : systemTopicSet) { + RemotingCommand request = buildCreateTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic."); + } + + //test validate error topic + String topic = ""; + RemotingCommand request = buildCreateTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + topic = "TEST_CREATE_TOPIC"; + request = buildCreateTopicRequest(topic); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + } + + @Test + public void testDeleteTopic() throws Exception { + //test system topic + for (String topic : systemTopicSet) { + RemotingCommand request = buildDeleteTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic."); + } + + String topic = "TEST_DELETE_TOPIC"; + RemotingCommand request = buildDeleteTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + private RemotingCommand buildCreateTopicRequest(String topic) { + CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name()); + requestHeader.setReadQueueNums(8); + requestHeader.setWriteQueueNums(8); + requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); + request.makeCustomHeaderToNet(); + return request; + } + + private RemotingCommand buildDeleteTopicRequest(String topic) { + DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); + requestHeader.setTopic(topic); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader); + request.makeCustomHeaderToNet(); + return request; + } + private MessageExt createDefaultMessageExt() { MessageExt messageExt = new MessageExt(); messageExt.setMsgId("12345678"); @@ -106,10 +190,11 @@ public class AdminBrokerProcessorTest { return messageExt; } - private SelectMappedBufferResult createSelectMappedBufferResult(){ - SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024) ,0, new MappedFile()); + private SelectMappedBufferResult createSelectMappedBufferResult() { + SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024), 0, new MappedFile()); return result; } + private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() { ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader(); header.setMsgId("C0A803CA00002A9F0000000000031367"); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java index 85c77504..eeca6e76 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java @@ -25,15 +25,14 @@ import java.util.Map; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.net.Broker2Client; -import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -115,7 +114,7 @@ public class ReplyMessageProcessorTest { SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(group); requestHeader.setTopic(topic); - requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC); + requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC); requestHeader.setDefaultTopicQueueNums(3); requestHeader.setQueueId(1); requestHeader.setSysFlag(0); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index bdf13d4a..b9344e90 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -23,7 +23,6 @@ import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -32,6 +31,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; @@ -238,7 +238,7 @@ public class SendMessageProcessorTest { SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(group); requestHeader.setTopic(topic); - requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC); + requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC); requestHeader.setDefaultTopicQueueNums(3); requestHeader.setQueueId(1); requestHeader.setSysFlag(0); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java index 653a9693..f6035463 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java @@ -19,10 +19,10 @@ package org.apache.rocketmq.broker.transaction.queue; import java.net.InetSocketAddress; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -92,7 +92,7 @@ public class DefaultTransactionalMessageCheckListenerTest { @Test public void testResolveDiscardMsg() { MessageExt messageExt = new MessageExt(); - messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); + messageExt.setTopic(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC); messageExt.setQueueId(0); messageExt.setBody("test resolve discard msg".getBytes()); messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911)); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java index 5d8c2b95..031a55a5 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java @@ -20,11 +20,11 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.AppendMessageResult; @@ -98,7 +98,7 @@ public class TransactionalMessageBridgeTest { @Test public void testFetchMessageQueues() { - Set messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); + Set messageQueues = transactionBridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC); assertThat(messageQueues.size()).isEqualTo(1); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java index 47eccbe0..8b138fc8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java @@ -23,13 +23,13 @@ import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.AppendMessageResult; @@ -108,10 +108,10 @@ public class TransactionalMessageServiceImplTest { @Test public void testCheck_withDiscard() { - when(bridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)); - when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createDiscardPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hellp", 1)); - when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0)); - when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createOpPulResult(MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "10", 1)); + when(bridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)); + when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createDiscardPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hellp", 1)); + when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0)); + when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createOpPulResult(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "10", 1)); long timeOut = this.brokerController.getBrokerConfig().getTransactionTimeOut(); int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax(); final AtomicInteger checkMessage = new AtomicInteger(0); @@ -128,10 +128,10 @@ public class TransactionalMessageServiceImplTest { @Test public void testCheck_withCheck() { - when(bridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)); - when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1)); - when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0)); - when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0)); + when(bridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)); + when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1)); + when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0)); + when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0)); when(bridge.getBrokerController()).thenReturn(this.brokerController); when(bridge.renewHalfMessageInner(any(MessageExtBrokerInner.class))).thenReturn(createMessageBrokerInner()); when(bridge.putMessageReturnResult(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index d77faf3c..e712e2f5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -21,10 +21,10 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.topic.TopicValidator; /** * Common Validator @@ -85,6 +85,7 @@ public class Validators { } // topic Validators.checkTopic(msg.getTopic()); + Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { @@ -116,11 +117,19 @@ public class Validators { throw new MQClientException( String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null); } + } + + public static void isSystemTopic(String topic) throws MQClientException { + if (TopicValidator.isSystemTopic(topic)) { + throw new MQClientException( + String.format("The topic[%s] is conflict with system topic.", topic), null); + } + } - //whether the same with system reserved keyword - if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + public static void isNotAllowedSendTopic(String topic) throws MQClientException { + if (TopicValidator.isNotAllowedSendTopic(topic)) { throw new MQClientException( - String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null); + String.format("Sending message to topic[%s] is forbidden.", topic), null); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 9dbd5520..2128ffd0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -82,6 +82,7 @@ public class MQAdminImpl { public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { try { Validators.checkTopic(newTopic); + Validators.isSystemTopic(newTopic); TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis); List brokerDataList = topicRouteData.getBrokerDatas(); if (brokerDataList != null && !brokerDataList.isEmpty()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 073e3679..c64d7c56 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1364,7 +1364,7 @@ public class MQClientAPIImpl { assert response != null; switch (response.getCode()) { case ResponseCode.TOPIC_NOT_EXIST: { - if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + if (allowTopicNotExist) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 886203fe..e937ce39 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -671,7 +671,7 @@ public class MQClientInstance { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (MQClientException e) { - if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } catch (RemotingException e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index bff14578..43dffd38 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -421,6 +421,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.makeSureStateOK(); Validators.checkTopic(newTopic); + Validators.isSystemTopic(newTopic); this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 04573410..09969504 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -74,7 +75,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Just for testing or demo program */ - private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; + private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; /** * Number of queues to create per default topic. diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 8c3d8865..1af6e60a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -41,11 +41,11 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -90,7 +90,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; } else { - this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; + this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC; } this.traceExecutor = new ThreadPoolExecutor(// 10, // diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index cb4a2465..27622cd3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.client.trace; -import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; public class TraceConstants { @@ -24,5 +24,5 @@ public class TraceConstants { public static final char CONTENT_SPLITOR = (char) 1; public static final char FIELD_SPLITOR = (char) 2; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; - public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_"; + public static final String TRACE_TOPIC_PREFIX = TopicValidator.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_"; } diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java index e2b9abd4..343fe4bc 100644 --- a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java @@ -19,11 +19,12 @@ package org.apache.rocketmq.client; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.junit.Assert.fail; public class ValidatorsTest { @@ -47,17 +48,6 @@ public class ValidatorsTest { } } - @Test - public void testCheckTopic_UseDefaultTopic() { - String defaultTopic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; - try { - Validators.checkTopic(defaultTopic); - failBecauseExceptionWasNotThrown(MQClientException.class); - } catch (MQClientException e) { - assertThat(e).hasMessageStartingWith(String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", defaultTopic)); - } - } - @Test public void testCheckTopic_BlankTopic() { String blankTopic = ""; @@ -80,4 +70,30 @@ public class ValidatorsTest { assertThat(e).hasMessageStartingWith("The specified topic is longer than topic max length"); } } + + @Test + public void testIsSystemTopic() { + for (String topic : TopicValidator.getSystemTopicSet()) { + try { + Validators.isSystemTopic(topic); + fail("excepted MQClientException for system topic"); + } catch (MQClientException e) { + assertThat(e.getResponseCode()).isEqualTo(-1); + assertThat(e.getErrorMessage()).isEqualTo(String.format("The topic[%s] is conflict with system topic.", topic)); + } + } + } + + @Test + public void testIsNotAllowedSendTopic() { + for (String topic : TopicValidator.getNotAllowedSendTopicSet()) { + try { + Validators.isNotAllowedSendTopic(topic); + fail("excepted MQClientException for blacklist topic"); + } catch (MQClientException e) { + assertThat(e.getResponseCode()).isEqualTo(-1); + assertThat(e.getErrorMessage()).isEqualTo(String.format("Sending message to topic[%s] is forbidden.", topic)); + } + } + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 496c5143..6c1380b1 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; @@ -90,7 +91,7 @@ import static org.mockito.Mockito.when; public class DefaultMQConsumerWithTraceTest { private String consumerGroup; private String consumerGroupNormal; - private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); private String topic = "FooBar"; private String brokerName = "BrokerA"; diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 3759acba..da573a25 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -31,12 +31,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; @@ -75,7 +75,7 @@ public class DefaultMQProducerWithTraceTest { private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); - private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); private String customerTraceTopic = "rmq_trace_topic_12345"; @Before diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index bfe8a210..d80b3d21 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; @@ -52,7 +53,7 @@ public class BrokerConfig { private boolean autoCreateSubscriptionGroup = true; private String messageStorePlugIn = ""; @ImportantField - private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; + private String msgTraceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC; @ImportantField private boolean traceTopicEnable = false; /** diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index de259c93..9d95ecb5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -55,8 +55,6 @@ public class MixAll { public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); //http://jmenv.tbsite.net:8080/rocketmq/nsaddr //public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; - public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable - public static final String BENCHMARK_TOPIC = "BenchmarkTest"; public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER"; @@ -65,8 +63,6 @@ public class MixAll { public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"; public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP"; public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP"; - public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; - public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"; public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY"; public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION"; public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER"; @@ -80,14 +76,9 @@ public class MixAll { public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC"; - public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY"; public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion"; public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; - public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; - public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC"; - public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; - public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC"; public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; public static final String REPLY_MESSAGE_FLAG = "reply"; @@ -115,10 +106,6 @@ public class MixAll { return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); } - public static boolean isSystemTopic(final String topic) { - return topic.startsWith(SYSTEM_TOPIC_PREFIX); - } - public static String getDLQTopic(final String consumerGroup) { return DLQ_GROUP_TOPIC_PREFIX + consumerGroup; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java index afd53761..4827844d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.common.protocol; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.topic.TopicValidator; public class NamespaceUtil { public static final char NAMESPACE_SEPARATOR = '%'; @@ -155,11 +156,11 @@ public class NamespaceUtil { return false; } - if (MixAll.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) { + if (TopicValidator.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) { return true; } - return MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC.equals(resource); + return false; } public static boolean isRetryTopic(String resource) { diff --git a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java new file mode 100644 index 00000000..7b0a8394 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.topic; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TopicValidator { + + public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable + public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; + public static final String RMQ_SYS_BENCHMARK_TOPIC = "BenchmarkTest"; + public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; + public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC"; + public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; + public static final String RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC"; + public static final String RMQ_SYS_SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; + public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"; + + public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_"; + + private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; + private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); + private static final int TOPIC_MAX_LENGTH = 127; + + private static final Set SYSTEM_TOPIC_SET = new HashSet(); + + /** + * Topics'set which client can not send msg! + */ + private static final Set NOT_ALLOWED_SEND_TOPIC_SET = new HashSet(); + + static { + SYSTEM_TOPIC_SET.add(AUTO_CREATE_TOPIC_KEY_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_BENCHMARK_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_HALF_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_TRACE_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_OP_HALF_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC); + SYSTEM_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT); + + NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC); + } + + private static boolean regularExpressionMatcher(String origin, Pattern pattern) { + if (pattern == null) { + return true; + } + Matcher matcher = pattern.matcher(origin); + return matcher.matches(); + } + + public static boolean validateTopic(String topic, RemotingCommand response) { + + if (UtilAll.isBlank(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic is blank."); + return false; + } + + if (!regularExpressionMatcher(topic, PATTERN)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR); + return false; + } + + if (topic.length() > TOPIC_MAX_LENGTH) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic is longer than topic max length."); + return false; + } + + return true; + } + + public static boolean isSystemTopic(String topic, RemotingCommand response) { + if (isSystemTopic(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The topic[" + topic + "] is conflict with system topic."); + return true; + } + return false; + } + + public static boolean isSystemTopic(String topic) { + return SYSTEM_TOPIC_SET.contains(topic) || topic.startsWith(SYSTEM_TOPIC_PREFIX); + } + + public static boolean isNotAllowedSendTopic(String topic) { + return NOT_ALLOWED_SEND_TOPIC_SET.contains(topic); + } + + public static boolean isNotAllowedSendTopic(String topic, RemotingCommand response) { + if (isNotAllowedSendTopic(topic)) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("Sending message to topic[" + topic + "] is forbidden."); + return true; + } + return false; + } + + public static void addSystemTopic(String systemTopic) { + SYSTEM_TOPIC_SET.add(systemTopic); + } + + public static Set getSystemTopicSet() { + return SYSTEM_TOPIC_SET; + } + + public static Set getNotAllowedSendTopicSet() { + return NOT_ALLOWED_SEND_TOPIC_SET; + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java b/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java similarity index 52% rename from broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java rename to common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java index 78be63fc..bb49417b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java @@ -13,11 +13,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ -package org.apache.rocketmq.broker.topic; + */ +package org.apache.rocketmq.common.topic; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.Test; @@ -40,18 +40,11 @@ public class TopicValidatorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); assertThat(response.getRemark()).contains("The specified topic contains illegal characters"); - clearResponse(response); - res = TopicValidator.validateTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, response); - assertThat(res).isFalse(); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).contains("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC."); - clearResponse(response); res = TopicValidator.validateTopic(generateString(128), response); assertThat(res).isFalse(); assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); assertThat(response.getRemark()).contains("The specified topic is longer than topic max length."); - } @Test @@ -64,6 +57,76 @@ public class TopicValidatorTest { assertThat(response.getRemark()).isEmpty(); } + @Test + public void testAddSystemTopic() { + String topic = "SYSTEM_TOPIC_TEST"; + TopicValidator.addSystemTopic(topic); + assertThat(TopicValidator.getSystemTopicSet()).contains(topic); + } + + @Test + public void testIsSystemTopic() { + boolean res; + for (String topic : TopicValidator.getSystemTopicSet()) { + res = TopicValidator.isSystemTopic(topic); + assertThat(res).isTrue(); + } + + String topic = TopicValidator.SYSTEM_TOPIC_PREFIX + "_test"; + res = TopicValidator.isSystemTopic(topic); + assertThat(res).isTrue(); + + topic = "test_not_system_topic"; + res = TopicValidator.isSystemTopic(topic); + assertThat(res).isFalse(); + } + + @Test + public void testIsSystemTopicWithResponse() { + RemotingCommand response = RemotingCommand.createResponseCommand(-1, ""); + boolean res; + for (String topic : TopicValidator.getSystemTopicSet()) { + res = TopicValidator.isSystemTopic(topic, response); + assertThat(res).isTrue(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).isEqualTo("The topic[" + topic + "] is conflict with system topic."); + } + + String topic = "test_not_system_topic"; + res = TopicValidator.isSystemTopic(topic, response); + assertThat(res).isFalse(); + } + + @Test + public void testIsNotAllowedSendTopic() { + boolean res; + for (String topic : TopicValidator.getNotAllowedSendTopicSet()) { + res = TopicValidator.isNotAllowedSendTopic(topic); + assertThat(res).isTrue(); + } + + String topic = "test_allowed_send_topic"; + res = TopicValidator.isNotAllowedSendTopic(topic); + assertThat(res).isFalse(); + } + + @Test + public void testIsNotAllowedSendTopicWithResponse() { + RemotingCommand response = RemotingCommand.createResponseCommand(-1, ""); + + boolean res; + for (String topic : TopicValidator.getNotAllowedSendTopicSet()) { + res = TopicValidator.isNotAllowedSendTopic(topic, response); + assertThat(res).isTrue(); + assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION); + assertThat(response.getRemark()).isEqualTo("Sending message to topic[" + topic + "] is forbidden."); + } + + String topic = "test_allowed_send_topic"; + res = TopicValidator.isNotAllowedSendTopic(topic, response); + assertThat(res).isFalse(); + } + private static void clearResponse(RemotingCommand response) { response.setCode(-1); response.setRemark(""); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 810110ee..b6d17daa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; @@ -340,7 +341,7 @@ public class CommitLog { // Timing message processing { String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); - if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) { + if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) { int delayLevel = Integer.parseInt(t); if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { @@ -577,7 +578,7 @@ public class CommitLog { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } - topic = ScheduleMessageService.SCHEDULE_TOPIC; + topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId @@ -799,7 +800,7 @@ public class CommitLog { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } - topic = ScheduleMessageService.SCHEDULE_TOPIC; + topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 900e9af3..87af9323 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; @@ -1019,7 +1020,7 @@ public class DefaultMessageStore implements MessageStore { Entry> next = it.next(); String topic = next.getKey(); - if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { + if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) { ConcurrentMap queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); @@ -1050,7 +1051,7 @@ public class DefaultMessageStore implements MessageStore { while (it.hasNext()) { Entry> next = it.next(); String topic = next.getKey(); - if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { + if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) { ConcurrentMap queueTable = next.getValue(); Iterator> itQT = queueTable.entrySet().iterator(); while (itQT.hasNext()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 3d748bf8..3361b63f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.CommitLog; @@ -383,7 +384,7 @@ public class DLedgerCommitLog extends CommitLog { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } - topic = ScheduleMessageService.SCHEDULE_TOPIC; + topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 3be8cbc8..3b19a16f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -25,9 +25,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; @@ -48,7 +48,6 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; public class ScheduleMessageService extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; private static final long FIRST_DELAY_TIME = 1000L; private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; @@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager { Map.Entry next = it.next(); int queueId = delayLevel2QueueId(next.getKey()); long delayOffset = next.getValue(); - long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId); + long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, queueId); String value = String.format("%d,%d", delayOffset, maxOffset); String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey()); stats.put(key, value); @@ -262,7 +261,7 @@ public class ScheduleMessageService extends ConfigManager { public void executeOnTimeup() { ConsumeQueue cq = - ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, + ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; @@ -306,7 +305,7 @@ public class ScheduleMessageService extends ConfigManager { if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); - if (MixAll.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { + if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 1ca3fe4c..e80a813e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -26,7 +26,6 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; @@ -51,6 +50,7 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -62,7 +62,7 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack; public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { private final DefaultMQAdminExtImpl defaultMQAdminExtImpl; private String adminExtGroup = "admin_ext_group"; - private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; + private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; private long timeoutMillis = 5000; public DefaultMQAdminExt() { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java index 9bf09ad4..94f588da 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -83,7 +84,7 @@ public class MonitorService { try { this.defaultMQPushConsumer.setConsumeThreadMin(1); this.defaultMQPushConsumer.setConsumeThreadMax(1); - this.defaultMQPushConsumer.subscribe(MixAll.OFFSET_MOVED_EVENT, "*"); + this.defaultMQPushConsumer.subscribe(TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT, "*"); this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override -- GitLab