diff --git a/client/pom.xml b/client/pom.xml
index 713523d692835af9ce8514b8479ab174d808e7e7..3a0f6ae02babdea99cb835156012ac77b721bd03 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -38,5 +38,9 @@
org.slf4j
slf4j-api
+
+ org.apache.commons
+ commons-lang3
+
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 7ea1bf91ddab23c5048a572c2dfe3e218cac9127..899efa684d550745911671ae085320280db6a033 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -118,23 +118,23 @@ public class Validators {
*/
public static void checkTopic(String topic) throws MQClientException {
if (UtilAll.isBlank(topic)) {
- throw new MQClientException("the specified topic is blank", null);
+ throw new MQClientException("The specified topic is blank", null);
}
if (!regularExpressionMatcher(topic, PATTERN)) {
throw new MQClientException(String.format(
- "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+ "The specified topic[%s] contains illegal characters, allowing only %s", topic,
VALID_PATTERN_STR), null);
}
if (topic.length() > CHARACTER_MAX_LENGTH) {
- throw new MQClientException("the specified topic is longer than topic max length 255.", null);
+ throw new MQClientException("The specified topic is longer than topic max length 255.", null);
}
//whether the same with system reserved keyword
if (topic.equals(MixAll.DEFAULT_TOPIC)) {
throw new MQClientException(
- String.format("the topic[%s] is conflict with default topic.", topic), null);
+ String.format("The topic[%s] is conflict with default topic.", topic), null);
}
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
index a30b5daba5de7bf33962e97353b35b3959f8b44e..ab223c3ecac23c373716c4b2f5844c404328c892 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
@@ -23,10 +23,6 @@ public class ThreadLocalIndex {
private final ThreadLocal threadLocalIndex = new ThreadLocal();
private final Random random = new Random();
- public ThreadLocalIndex(int value) {
-
- }
-
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 52c5cfba02bf6f95fdf426dbabec80fd1f358ffd..deb02cff285b42c241e4c064ad376d47ebdecfa8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -27,7 +27,7 @@ public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List messageQueueList = new ArrayList();
- private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
+ private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
public boolean isOrderTopic() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 5309bc9a55996677375c2552e38eef85fe189220..72d43476f8371920e78153886b5ec4cff100f196 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.client.common.ThreadLocalIndex;
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16);
- private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0);
+ private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
index 6775404ccdf473d9a823ec68209dc1e2b3c97ad2..2db648d8626ea1e18de50a880ebdee621830fe7c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
@@ -17,17 +17,67 @@
package org.apache.rocketmq.client;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
import org.junit.Test;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
public class ValidatorsTest {
@Test
- public void topicValidatorTest() throws MQClientException {
+ public void testCheckTopic_Success() throws MQClientException {
Validators.checkTopic("Hello");
Validators.checkTopic("%RETRY%Hello");
Validators.checkTopic("_%RETRY%Hello");
Validators.checkTopic("-%RETRY%Hello");
Validators.checkTopic("223-%RETRY%Hello");
}
+
+ @Test
+ public void testCheckTopic_HasIllegalCharacters() {
+ String illegalTopic = "TOPIC&*^";
+ try {
+ Validators.checkTopic(illegalTopic);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageStartingWith(String.format("The specified topic[%s] contains illegal characters, allowing only %s", illegalTopic, Validators.VALID_PATTERN_STR));
+ }
+ }
+
+ @Test
+ public void testCheckTopic_UseDefaultTopic() {
+ String defaultTopic = MixAll.DEFAULT_TOPIC;
+ try {
+ Validators.checkTopic(defaultTopic);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageStartingWith(String.format("The topic[%s] is conflict with default topic.", defaultTopic));
+ }
+ }
+
+ @Test
+ public void testCheckTopic_BlankTopic() {
+ String blankTopic = "";
+ try {
+ Validators.checkTopic(blankTopic);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageStartingWith("The specified topic is blank");
+ }
+ }
+
+ @Test
+ public void testCheckTopic_TooLongTopic() {
+ String tooLongTopic = StringUtils.rightPad("TooLongTopic", Validators.CHARACTER_MAX_LENGTH + 1, "_");
+ assertThat(tooLongTopic.length()).isGreaterThan(Validators.CHARACTER_MAX_LENGTH);
+ try {
+ Validators.checkTopic(tooLongTopic);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageStartingWith("The specified topic is longer than topic max length 255.");
+ }
+ }
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..b937e457995ba24682bc8a1b981c558ff7168cc9
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.common;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ThreadLocalIndexTest {
+
+ @Test
+ public void getAndIncrement() throws Exception {
+ ThreadLocalIndex localIndex = new ThreadLocalIndex();
+ int initialVal = localIndex.getAndIncrement();
+
+ assertThat(localIndex.getAndIncrement()).isEqualTo(initialVal + 1);
+ }
+
+}
\ No newline at end of file