From f8f6fbe4aa7f5dee937e688322628c366b12a552 Mon Sep 17 00:00:00 2001 From: Heng Du Date: Thu, 12 Dec 2019 11:39:33 +0800 Subject: [PATCH] [ISSUE #1637]Fix 1637 (#1644) * fix(broker): add the check logic of the server to the topic * chore(test):add unit test * chore(validator):polish the code * chore(test):add ASF license header --- .../AbstractSendMessageProcessor.java | 11 +-- .../processor/AdminBrokerProcessor.java | 29 ++++--- .../broker/topic/TopicConfigManager.java | 4 - .../rocketmq/broker/topic/TopicValidator.java | 69 ++++++++++++++++ .../broker/topic/TopicValidatorTest.java | 80 +++++++++++++++++++ .../apache/rocketmq/client/Validators.java | 7 +- .../rocketmq/client/impl/MQAdminImpl.java | 2 + .../rocketmq/client/impl/MQClientAPIImpl.java | 6 +- .../org/apache/rocketmq/common/UtilAll.java | 4 +- 9 files changed, 178 insertions(+), 34 deletions(-) create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index b0668d49..adf02799 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -16,16 +16,16 @@ */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; import java.util.Map; import java.util.Random; - -import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; +import org.apache.rocketmq.broker.topic.TopicValidator; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; @@ -171,11 +171,8 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces + "] sending message is forbidden"); return response; } - if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { - String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; - log.warn(errorMsg); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(errorMsg); + + if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) { return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index d63cc201..df0ec905 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -38,25 +38,18 @@ import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; +import org.apache.rocketmq.broker.topic.TopicValidator; +import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; -import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; -import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; @@ -84,10 +77,15 @@ import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody; +import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; @@ -110,6 +108,7 @@ import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; @@ -118,6 +117,8 @@ import org.apache.rocketmq.common.stats.StatsItem; import org.apache.rocketmq.common.stats.StatsSnapshot; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.filter.util.BitsArray; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -258,6 +259,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } + if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) { + return response; + } + try { response.setCode(ResponseCode.SUCCESS); response.setOpaque(request.getOpaque()); @@ -312,8 +317,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { accessConfig.setWhiteRemoteAddress(requestHeader.getWhiteRemoteAddress()); accessConfig.setDefaultTopicPerm(requestHeader.getDefaultTopicPerm()); accessConfig.setDefaultGroupPerm(requestHeader.getDefaultGroupPerm()); - accessConfig.setTopicPerms(UtilAll.String2List(requestHeader.getTopicPerms(),",")); - accessConfig.setGroupPerms(UtilAll.String2List(requestHeader.getGroupPerms(),",")); + accessConfig.setTopicPerms(UtilAll.string2List(requestHeader.getTopicPerms(), ",")); + accessConfig.setGroupPerms(UtilAll.string2List(requestHeader.getGroupPerms(), ",")); accessConfig.setAdmin(requestHeader.isAdmin()); try { @@ -386,7 +391,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { try { AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class); - if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.String2List(requestHeader.getGlobalWhiteAddrs(),","))) { + if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.string2List(requestHeader.getGlobalWhiteAddrs(), ","))) { response.setCode(ResponseCode.SUCCESS); response.setOpaque(request.getOpaque()); response.markResponseType(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index cb290111..199b46d6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -152,10 +152,6 @@ public class TopicConfigManager extends ConfigManager { return this.systemTopicList; } - public boolean isTopicCanSendMessage(final String topic) { - return !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC); - } - public TopicConfig selectTopicConfig(final String topic) { return this.topicConfigTable.get(topic); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java new file mode 100644 index 00000000..8b534764 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicValidator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class TopicValidator { + + private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; + private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); + private static final int CHARACTER_MAX_LENGTH = 255; + + private static boolean regularExpressionMatcher(String origin, Pattern pattern) { + if (pattern == null) { + return true; + } + Matcher matcher = pattern.matcher(origin); + return matcher.matches(); + } + + public static boolean validateTopic(String topic, RemotingCommand response) { + + if (UtilAll.isBlank(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic is blank."); + return false; + } + + if (!regularExpressionMatcher(topic, PATTERN)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR); + return false; + } + + if (topic.length() > CHARACTER_MAX_LENGTH) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic is longer than topic max length 255."); + return false; + } + + //whether the same with system reserved keyword + if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC."); + return false; + } + + return true; + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java new file mode 100644 index 00000000..267931fd --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicValidatorTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TopicValidatorTest { + + @Test + public void testTopicValidator_NotPass() { + RemotingCommand response = RemotingCommand.createResponseCommand(-1, ""); + + Boolean res = TopicValidator.validateTopic("", response); + assertThat(res).isFalse(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).contains("The specified topic is blank"); + + clearResponse(response); + res = TopicValidator.validateTopic("../TopicTest", response); + assertThat(res).isFalse(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).contains("The specified topic contains illegal characters"); + + clearResponse(response); + res = TopicValidator.validateTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, response); + assertThat(res).isFalse(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).contains("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC."); + + clearResponse(response); + res = TopicValidator.validateTopic(generateString(255), response); + assertThat(res).isFalse(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).contains("The specified topic is longer than topic max length 255."); + + } + + @Test + public void testTopicValidator_Pass() { + RemotingCommand response = RemotingCommand.createResponseCommand(-1, ""); + + Boolean res = TopicValidator.validateTopic("TestTopic", response); + assertThat(res).isTrue(); + assertThat(response.getCode()).isEqualTo(-1); + assertThat(response.getRemark()).isEmpty(); + } + + private static void clearResponse(RemotingCommand response) { + response.setCode(-1); + response.setRemark(""); + } + + private static String generateString(int length) { + StringBuilder stringBuffer = new StringBuilder(); + String tmpStr = "0123456789"; + for (int i = 0; i < length; i++) { + stringBuffer.append(tmpStr); + } + return stringBuffer.toString(); + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index 1b96cd05..a37a17b6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -77,9 +77,6 @@ public class Validators { return matcher.matches(); } - /** - * Validate message - */ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { @@ -103,9 +100,6 @@ public class Validators { } } - /** - * Validate topic - */ public static void checkTopic(String topic) throws MQClientException { if (UtilAll.isBlank(topic)) { throw new MQClientException("The specified topic is blank", null); @@ -127,4 +121,5 @@ public class Validators { String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null); } } + } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index ca89d613..9dbd5520 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -80,6 +81,7 @@ public class MQAdminImpl { public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { try { + Validators.checkTopic(newTopic); TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis); List brokerDataList = topicRouteData.getBrokerDatas(); if (brokerDataList != null && !brokerDataList.isEmpty()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 1ad5fbfe..9380f4b7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -43,12 +43,12 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; @@ -305,8 +305,8 @@ public class MQClientAPIImpl { requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm()); requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm()); requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress()); - requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ",")); - requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ",")); + requestHeader.setTopicPerms(UtilAll.list2String(plainAccessConfig.getTopicPerms(), ",")); + requestHeader.setGroupPerms(UtilAll.list2String(plainAccessConfig.getGroupPerms(), ",")); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader); diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 222f697e..624199c4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -580,7 +580,7 @@ public class UtilAll { } } - public static String List2String(List list, String splitor) { + public static String list2String(List list, String splitor) { if (list == null || list.size() == 0) { return null; } @@ -595,7 +595,7 @@ public class UtilAll { return str.toString(); } - public static List String2List(String str, String splitor) { + public static List string2List(String str, String splitor) { if (StringUtils.isEmpty(str)) { return null; } -- GitLab