未验证 提交 f8f6fbe4 编写于 作者: H Heng Du 提交者: GitHub

[ISSUE #1637]Fix 1637 (#1644)

* fix(broker): add the check logic of the server to the topic

* chore(test):add unit test

* chore(validator):polish the code

* chore(test):add ASF license header
上级 3d778ef6
......@@ -16,16 +16,16 @@
*/
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.topic.TopicValidator;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
......@@ -171,11 +171,8 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
+ "] sending message is forbidden");
return response;
}
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response;
}
......
......@@ -38,25 +38,18 @@ import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.broker.topic.TopicValidator;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -84,10 +77,15 @@ import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
......@@ -110,6 +108,7 @@ import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
......@@ -118,6 +117,8 @@ import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
......@@ -258,6 +259,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response;
}
try {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
......@@ -312,8 +317,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
accessConfig.setWhiteRemoteAddress(requestHeader.getWhiteRemoteAddress());
accessConfig.setDefaultTopicPerm(requestHeader.getDefaultTopicPerm());
accessConfig.setDefaultGroupPerm(requestHeader.getDefaultGroupPerm());
accessConfig.setTopicPerms(UtilAll.String2List(requestHeader.getTopicPerms(),","));
accessConfig.setGroupPerms(UtilAll.String2List(requestHeader.getGroupPerms(),","));
accessConfig.setTopicPerms(UtilAll.string2List(requestHeader.getTopicPerms(), ","));
accessConfig.setGroupPerms(UtilAll.string2List(requestHeader.getGroupPerms(), ","));
accessConfig.setAdmin(requestHeader.isAdmin());
try {
......@@ -386,7 +391,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
try {
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.String2List(requestHeader.getGlobalWhiteAddrs(),","))) {
if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.string2List(requestHeader.getGlobalWhiteAddrs(), ","))) {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
......
......@@ -152,10 +152,6 @@ public class TopicConfigManager extends ConfigManager {
return this.systemTopicList;
}
public boolean isTopicCanSendMessage(final String topic) {
return !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
}
public TopicConfig selectTopicConfig(final String topic) {
return this.topicConfigTable.get(topic);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class TopicValidator {
private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
private static final int CHARACTER_MAX_LENGTH = 255;
private static boolean regularExpressionMatcher(String origin, Pattern pattern) {
if (pattern == null) {
return true;
}
Matcher matcher = pattern.matcher(origin);
return matcher.matches();
}
public static boolean validateTopic(String topic, RemotingCommand response) {
if (UtilAll.isBlank(topic)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The specified topic is blank.");
return false;
}
if (!regularExpressionMatcher(topic, PATTERN)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR);
return false;
}
if (topic.length() > CHARACTER_MAX_LENGTH) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The specified topic is longer than topic max length 255.");
return false;
}
//whether the same with system reserved keyword
if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.");
return false;
}
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TopicValidatorTest {
@Test
public void testTopicValidator_NotPass() {
RemotingCommand response = RemotingCommand.createResponseCommand(-1, "");
Boolean res = TopicValidator.validateTopic("", response);
assertThat(res).isFalse();
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).contains("The specified topic is blank");
clearResponse(response);
res = TopicValidator.validateTopic("../TopicTest", response);
assertThat(res).isFalse();
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).contains("The specified topic contains illegal characters");
clearResponse(response);
res = TopicValidator.validateTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, response);
assertThat(res).isFalse();
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).contains("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.");
clearResponse(response);
res = TopicValidator.validateTopic(generateString(255), response);
assertThat(res).isFalse();
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).contains("The specified topic is longer than topic max length 255.");
}
@Test
public void testTopicValidator_Pass() {
RemotingCommand response = RemotingCommand.createResponseCommand(-1, "");
Boolean res = TopicValidator.validateTopic("TestTopic", response);
assertThat(res).isTrue();
assertThat(response.getCode()).isEqualTo(-1);
assertThat(response.getRemark()).isEmpty();
}
private static void clearResponse(RemotingCommand response) {
response.setCode(-1);
response.setRemark("");
}
private static String generateString(int length) {
StringBuilder stringBuffer = new StringBuilder();
String tmpStr = "0123456789";
for (int i = 0; i < length; i++) {
stringBuffer.append(tmpStr);
}
return stringBuffer.toString();
}
}
......@@ -77,9 +77,6 @@ public class Validators {
return matcher.matches();
}
/**
* Validate message
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
......@@ -103,9 +100,6 @@ public class Validators {
}
}
/**
* Validate topic
*/
public static void checkTopic(String topic) throws MQClientException {
if (UtilAll.isBlank(topic)) {
throw new MQClientException("The specified topic is blank", null);
......@@ -127,4 +121,5 @@ public class Validators {
String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null);
}
}
}
......@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
......@@ -80,6 +81,7 @@ public class MQAdminImpl {
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
try {
Validators.checkTopic(newTopic);
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
if (brokerDataList != null && !brokerDataList.isEmpty()) {
......
......@@ -43,12 +43,12 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
......@@ -305,8 +305,8 @@ public class MQClientAPIImpl {
requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ","));
requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ","));
requestHeader.setTopicPerms(UtilAll.list2String(plainAccessConfig.getTopicPerms(), ","));
requestHeader.setGroupPerms(UtilAll.list2String(plainAccessConfig.getGroupPerms(), ","));
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
......
......@@ -580,7 +580,7 @@ public class UtilAll {
}
}
public static String List2String(List<String> list, String splitor) {
public static String list2String(List<String> list, String splitor) {
if (list == null || list.size() == 0) {
return null;
}
......@@ -595,7 +595,7 @@ public class UtilAll {
return str.toString();
}
public static List<String> String2List(String str, String splitor) {
public static List<String> string2List(String str, String splitor) {
if (StringUtils.isEmpty(str)) {
return null;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册