diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 94ebe4f12684543b13fa91c9640294d7cf3454fa..e9237b67c819fd3bd2988939a56f42cff56dc762 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.common.SslMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
@@ -97,6 +98,7 @@ public class BrokerStartup {
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ nettyClientConfig.setUseTLS(NettySystemConfig.sslMode != SslMode.DISABLED);
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
diff --git a/client/pom.xml b/client/pom.xml
index 2f3fb133a669644098f025114e0773a2ab9f595e..3f5c9617886f5bbc536ee56844fa829e9208133f 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -36,6 +36,12 @@
${project.groupId}rocketmq-common
+
+
+ io.netty
+ netty-tcnative
+
+ org.slf4j
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 950d75625b4b48f4c0cbf97fc64dd0a62597a065..8f255f012b13e5a67be48b368ff53bb144e50d81 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -45,6 +45,8 @@ public class ClientConfig {
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+ private boolean useTLS;
+
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
@@ -92,6 +94,7 @@ public class ClientConfig {
this.unitMode = cc.unitMode;
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
+ this.useTLS = cc.useTLS;
}
public ClientConfig cloneClientConfig() {
@@ -106,6 +109,7 @@ public class ClientConfig {
cc.unitMode = unitMode;
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
+ cc.useTLS = useTLS;
return cc;
}
@@ -173,12 +177,20 @@ public class ClientConfig {
this.vipChannelEnabled = vipChannelEnabled;
}
+ public boolean isUseTLS() {
+ return useTLS;
+ }
+
+ public void setUseTLS(boolean useTLS) {
+ this.useTLS = useTLS;
+ }
+
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
- + vipChannelEnabled + "]";
+ + vipChannelEnabled + ", useTLS=" + useTLS + "]";
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 31c2c3cc3ae8c01e5375b389d1ff8420327ac112..365fca9fa1065c2fe3641bd891dd3b6869c09040 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -127,6 +127,7 @@ public class MQClientInstance {
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
+ this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh
index b0c490e96685eae67ee107fae42bc598b512f5a8..74a28d15b965276ab871b8c036d696ab3abd9419 100644
--- a/distribution/bin/runbroker.sh
+++ b/distribution/bin/runbroker.sh
@@ -44,7 +44,7 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
-JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
+JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
diff --git a/distribution/bin/runserver.sh b/distribution/bin/runserver.sh
index 7c41b0c278a683a74bd80028ff68e8fc90ad95d7..cc4c8d7e738aae6a8d615964c5be1163da596455 100644
--- a/distribution/bin/runserver.sh
+++ b/distribution/bin/runserver.sh
@@ -41,7 +41,7 @@ JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
-JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
+JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
diff --git a/pom.xml b/pom.xml
index ab90868e9e8e143800751178609e43e0e1affd3f..e317472c0e681d81457c03aada12cf8e7592fa11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -550,7 +550,7 @@
io.nettynetty-all
- 4.0.36.Final
+ 4.0.42.Finalcom.alibaba
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 05b4274d67447664124c81454be265fdf081dbbd..61f0286a1a07e1d0c59afce72727308a49892a8a 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -45,5 +45,22 @@
org.slf4jslf4j-api
+
+
+ io.netty
+ netty-tcnative
+ 1.1.33.Fork22
+ ${os.detected.classifier}
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.4.0.Final
+
+
+
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 276a56539ba219121e3bf256055c06b6e280ddc0..b527408e7a4aa6e6838fbaa87306e35212cb9c56 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -27,24 +27,24 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
- public void updateNameServerAddressList(final List addrs);
+ void updateNameServerAddressList(final List addrs);
- public List getNameServerAddressList();
+ List getNameServerAddressList();
- public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+ RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
- public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
+ void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
- public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
+ void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
- public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+ void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
- public boolean isChannelWriteable(final String addr);
+ boolean isChannelWritable(final String addr);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java
new file mode 100644
index 0000000000000000000000000000000000000000..8801736b2d24cf2750caa2f3d669d22faf82a57a
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.common;
+
+/**
+ * For server, three SSL modes are supported: disabled, permissive and enforcing.
+ *
+ *
disable: SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.
+ *
permissive: SSL is optional, aka, server in this mode can serve client connections with or without SSL;
+ *
enforcing: SSL is required, aka, non SSL connection will be rejected.
+ *
+ */
+public enum SslMode {
+
+ DISABLED("disabled"),
+ PERMISSIVE("permissive"),
+ ENFORCING("enforcing");
+
+ private String name;
+
+ SslMode(String name) {
+ this.name = name;
+ }
+
+ public static SslMode parse(String mode) {
+ for (SslMode sslMode: SslMode.values()) {
+ if (sslMode.name.equals(mode)) {
+ return sslMode;
+ }
+ }
+
+ return PERMISSIVE;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
new file mode 100644
index 0000000000000000000000000000000000000000..2bd15aea3c279005178ce78e1271dd2dc02f274e
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import io.netty.handler.ssl.SslHandler;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ *
+ * By default, file region are directly transferred to socket channel which is known as zero copy. In case we need
+ * to encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this
+ * process.
+ *
+ */
+public class FileRegionEncoder extends MessageToByteEncoder {
+
+ /**
+ * Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that
+ * can be handled by this encoder.
+ *
+ * @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link
+ * io.netty.handler.codec.MessageToByteEncoder} belongs to
+ * @param msg the message to encode
+ * @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written
+ * @throws Exception is thrown if an error occurs
+ */
+ @Override
+ protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception {
+ WritableByteChannel writableByteChannel = new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ out.writeBytes(src);
+ return out.capacity();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ };
+
+ long toTransfer = msg.count();
+
+ while (true) {
+ long transferred = msg.transfered();
+ if (toTransfer - transferred <= 0) {
+ break;
+ }
+ msg.transferTo(writableByteChannel, transferred);
+ }
+ }
+}
\ No newline at end of file
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 9edaa54213e24c204c835bbb66f8f79b66ecfda3..fbc071b28f7d6ab390f56dae0ad44697840814bf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -38,6 +38,8 @@ public class NettyClientConfig {
private boolean clientPooledByteBufAllocatorEnable = false;
private boolean clientCloseSocketIfTimeout = false;
+ private boolean useTLS;
+
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
@@ -125,4 +127,12 @@ public class NettyClientConfig {
public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
}
+
+ public boolean isUseTLS() {
+ return useTLS;
+ }
+
+ public void setUseTLS(boolean useTLS) {
+ this.useTLS = useTLS;
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 614346285a91263bf0fccc47f547edf8f7cb1640..06918086ff1a8cc06240569b8d9c33176ed98660 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -20,6 +20,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
@@ -88,6 +90,11 @@ public abstract class NettyRemotingAbstract {
*/
protected Pair defaultRequestProcessor;
+ /**
+ * SSL context via which to create {@link SslHandler}.
+ */
+ protected SslContext sslContext;
+
/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
*
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 34f560fdab1f202665488962df18680fc12b0176..a96423c1fce494d5d8d32c16d0ebcb9520b7a755 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -34,6 +35,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.SocketAddress;
+import java.security.cert.CertificateException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -120,6 +123,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
+
+ if (nettyClientConfig.isUseTLS()) {
+ try {
+ sslContext = SslHelper.buildSslContext(true);
+ log.info("SSL enabled for client");
+ } catch (SSLException e) {
+ log.error("Failed to create SSLContext", e);
+ } catch (CertificateException e) {
+ log.error("Failed to create SSLContext", e);
+ throw new RuntimeException("Failed to create SSLContext", e);
+ }
+ }
}
private static int initValueIndex() {
@@ -151,7 +166,16 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
+ ChannelPipeline pipeline = ch.pipeline();
+ if (nettyClientConfig.isUseTLS()) {
+ if (null != sslContext) {
+ pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
+ log.info("Prepend SSL handler");
+ } else {
+ log.warn("Connections are insecure as SSLContext is null!");
+ }
+ }
+ pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
@@ -421,17 +445,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
- return cw.getChannel();
+ cw.getChannel().close();
+ channelTables.remove(addr);
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
- boolean createNewConnection = false;
+ boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
- return cw.getChannel();
+ cw.getChannel().close();
+ this.channelTables.remove(addr);
+ createNewConnection = true;
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
@@ -530,10 +557,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
@Override
- public boolean isChannelWriteable(String addr) {
+ public boolean isChannelWritable(String addr) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
- return cw.isWriteable();
+ return cw.isWritable();
}
return true;
}
@@ -569,7 +596,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}
- public boolean isWriteable() {
+ public boolean isWritable() {
return this.channelFuture.channel().isWritable();
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 7cf82c96d0bddb1ac81a36cc9f55b2a2964d5538..ec1927a699f4406c07a765c5656a6231e0551827 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
@@ -37,12 +38,15 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -50,6 +54,7 @@ import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.common.SslMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
@@ -74,6 +79,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private int port = 0;
+ private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+ private static final String TLS_HANDLER_NAME = "sslHandler";
+ private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
@@ -129,6 +138,20 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
});
}
+
+ SslMode sslMode = NettySystemConfig.sslMode;
+ log.info("Server is running in TLS {} mode", sslMode.getName());
+
+ if (sslMode != SslMode.DISABLED) {
+ try {
+ sslContext = SslHelper.buildSslContext(false);
+ log.info("SSLContext created for server");
+ } catch (CertificateException e) {
+ log.error("Failed to create SSLContext for server", e);
+ } catch (SSLException e) {
+ log.error("Failed to create SSLContext for server", e);
+ }
+ }
}
private boolean useEpoll() {
@@ -164,13 +187,16 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
- defaultEventExecutorGroup,
- new NettyEncoder(),
- new NettyDecoder(),
- new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- new NettyConnectManageHandler(),
- new NettyServerHandler());
+ ch.pipeline()
+ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
+ new HandshakeHandler(NettySystemConfig.sslMode))
+ .addLast(defaultEventExecutorGroup,
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ new NettyConnectManageHandler(),
+ new NettyServerHandler()
+ );
}
});
@@ -298,6 +324,68 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return this.publicExecutor;
}
+ class HandshakeHandler extends SimpleChannelInboundHandler {
+
+ private final SslMode sslMode;
+
+ private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
+
+ HandshakeHandler(SslMode sslMode) {
+ this.sslMode = sslMode;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+
+ // mark the current position so that we can peek the first byte to determine if the content is starting with
+ // TLS handshake
+ msg.markReaderIndex();
+
+ byte b = msg.getByte(0);
+
+ if (b == HANDSHAKE_MAGIC_CODE) {
+ switch (sslMode) {
+ case DISABLED:
+ ctx.close();
+ log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
+ break;
+ case PERMISSIVE:
+ case ENFORCING:
+ if (null != sslContext) {
+ ctx.pipeline()
+ .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
+ .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+ log.info("Handlers prepended to channel pipeline to establish SSL connection");
+ } else {
+ ctx.close();
+ log.error("Trying to establish a SSL connection but sslContext is null");
+ }
+ break;
+
+ default:
+ log.warn("Unknown TLS mode");
+ break;
+ }
+ } else if (sslMode == SslMode.ENFORCING) {
+ ctx.close();
+ log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
+ }
+
+ // reset the reader index so that handshake negotiation may proceed as normal.
+ msg.resetReaderIndex();
+
+ try {
+ // Remove this handler
+ ctx.pipeline().remove(this);
+ } catch (NoSuchElementException e) {
+ log.error("Error while removing HandshakeHandler", e);
+ }
+
+ // Hand over this message to the next .
+ ctx.fireChannelRead(msg.retain());
+ }
+ }
+
class NettyServerHandler extends SimpleChannelInboundHandler {
@Override
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 2e0a81e87a1c5fdf7de3a818b7d6eccaa2a73217..3300262c95d1f10bf44c91f9f7db88925af02e05 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.remoting.netty;
+import org.apache.rocketmq.remoting.common.SslMode;
+
public class NettySystemConfig {
public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
"com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
@@ -28,10 +30,16 @@ public class NettySystemConfig {
"com.rocketmq.remoting.clientAsyncSemaphoreValue";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE =
"com.rocketmq.remoting.clientOnewaySemaphoreValue";
- public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
- Boolean
- .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
- public static final int CLIENT_ASYNC_SEMAPHORE_VALUE =
+
+ public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE = //
+ "org.apache.rocketmq.remoting.ssl.mode";
+
+ public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE = //
+ "org.apache.rocketmq.remoting.ssl.config.file";
+
+ public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
+ Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
+ public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
@@ -39,4 +47,18 @@ public class NettySystemConfig {
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
public static int socketRcvbufSize =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
+
+ /**
+ * For server, three SSL modes are supported: disabled, permissive and enforcing.
+ *
+ *
disable: SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.
+ *
permissive: SSL is optional, aka, server in this mode can serve client connections with or without SSL;
+ *
enforcing: SSL is required, aka, non SSL connection will be rejected.
+ *
+ */
+ public static SslMode sslMode = //
+ SslMode.parse(System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE, "permissive"));
+
+ public static String sslConfigFile = //
+ System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE, "/etc/rocketmq/ssl.properties");
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
new file mode 100644
index 0000000000000000000000000000000000000000..ebadd9682525bcb897a8b2f8fd243b6833d608c9
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.cert.CertificateException;
+import java.util.Properties;
+import javax.net.ssl.SSLException;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SslHelper {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ public static SslContext buildSslContext(boolean forClient) throws SSLException, CertificateException {
+
+ File configFile = new File(NettySystemConfig.sslConfigFile);
+ boolean testMode = !(configFile.exists() && configFile.isFile() && configFile.canRead());
+ Properties properties = null;
+
+ if (!testMode) {
+ properties = new Properties();
+ InputStream inputStream = null;
+ try {
+ inputStream = new FileInputStream(configFile);
+ properties.load(inputStream);
+ } catch (FileNotFoundException ignore) {
+ } catch (IOException ignore) {
+ } finally {
+ if (null != inputStream) {
+ try {
+ inputStream.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+ }
+
+ SslProvider provider = null;
+ if (OpenSsl.isAvailable()) {
+ provider = SslProvider.OPENSSL;
+ LOGGER.info("Using OpenSSL provider");
+ } else {
+ provider = SslProvider.JDK;
+ LOGGER.info("Using JDK SSL provider");
+ }
+
+ if (forClient) {
+ if (testMode) {
+ return SslContextBuilder
+ .forClient()
+ .sslProvider(SslProvider.JDK)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build();
+ } else {
+ SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);
+
+ if ("false".equals(properties.getProperty("client.auth.server"))) {
+ sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+ } else {
+ if (properties.containsKey("client.trustManager")) {
+ sslContextBuilder.trustManager(new File(properties.getProperty("client.trustManager")));
+ }
+ }
+
+ return sslContextBuilder.keyManager(
+ properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null,
+ properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.keyFile")) : null,
+ properties.containsKey("client.password") ? properties.getProperty("client.password") : null)
+ .build();
+ }
+ } else {
+
+ if (testMode) {
+ SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
+ return SslContextBuilder
+ .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
+ .sslProvider(SslProvider.JDK)
+ .clientAuth(ClientAuth.OPTIONAL)
+ .build();
+ } else {
+ return SslContextBuilder.forServer(
+ properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null,
+ properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.keyFile")) : null,
+ properties.containsKey("server.password") ? properties.getProperty("server.password") : null)
+ .sslProvider(provider)
+ .trustManager(new File(properties.getProperty("server.trustManager")))
+ .clientAuth(parseClientAuthMode(properties.getProperty("server.auth.client")))
+ .build();
+ }
+ }
+ }
+
+ private static ClientAuth parseClientAuthMode(String authMode) {
+ if (null == authMode || authMode.trim().isEmpty()) {
+ return ClientAuth.NONE;
+ }
+
+ if ("optional".equalsIgnoreCase(authMode)) {
+ return ClientAuth.OPTIONAL;
+ }
+
+ return ClientAuth.REQUIRE;
+ }
+}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..6c7327f258ed8013f2804ba49ac5c19e0c2a8b56
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.FileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+import java.util.UUID;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FileRegionEncoderTest {
+
+ /**
+ * This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
+ * {@link ByteBuf}.
+ * @throws IOException if there is an error.
+ */
+ @Test
+ public void testEncode() throws IOException {
+ FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
+ EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
+ File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
+ file.deleteOnExit();
+ Random random = new Random(System.currentTimeMillis());
+ int dataLength = 1 << 10;
+ byte[] data = new byte[dataLength];
+ random.nextBytes(data);
+ write(file, data);
+ FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
+ Assert.assertEquals(0, fileRegion.transfered());
+ Assert.assertEquals(dataLength, fileRegion.count());
+ Assert.assertTrue(channel.writeOutbound(fileRegion));
+ ByteBuf out = (ByteBuf) channel.readOutbound();
+ byte[] arr = new byte[out.readableBytes()];
+ out.getBytes(0, arr);
+ Assert.assertArrayEquals("Data should be identical", data, arr);
+ }
+
+ /**
+ * Write byte array to the specified file.
+ *
+ * @param file File to write to.
+ * @param data byte array to write.
+ * @throws IOException in case there is an exception.
+ */
+ private static void write(File file, byte[] data) throws IOException {
+ BufferedOutputStream bufferedOutputStream = null;
+ try {
+ bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file, false));
+ bufferedOutputStream.write(data);
+ bufferedOutputStream.flush();
+ } finally {
+ if (null != bufferedOutputStream) {
+ bufferedOutputStream.close();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
index 7225cd6bd46203f84070a8a663e531013ef5c32f..ce739be591452f733f7f8def22898dd8a29a7834 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -29,8 +29,8 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
protected DefaultMQPushConsumer consumer = null;
public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
- String consumerGroup, AbstractListener listner) {
- super(nsAddr, topic, subExpression, consumerGroup, listner);
+ String consumerGroup, AbstractListener listener) {
+ super(nsAddr, topic, subExpression, consumerGroup, listener);
}
public AbstractListener getListener() {
@@ -42,6 +42,10 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
}
public void create() {
+ create(false);
+ }
+
+ public void create(boolean useTLS) {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr);
@@ -52,6 +56,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
e.printStackTrace();
}
consumer.setMessageListener(listener);
+ consumer.setUseTLS(useTLS);
}
public void start() {
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
index 26b77fe846c7d002f8e5bad0e60dc63d55056789..42949339e33d1c4f24fc82bbadf54424a168c854 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -34,20 +34,29 @@ public class RMQNormalProducer extends AbstractMQProducer {
private String nsAddr = null;
public RMQNormalProducer(String nsAddr, String topic) {
+ this(nsAddr, topic, false);
+ }
+
+ public RMQNormalProducer(String nsAddr, String topic, boolean useTLS) {
super(topic);
this.nsAddr = nsAddr;
- create();
+ create(useTLS);
start();
}
public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
String producerInstanceName) {
+ this(nsAddr, topic, producerGroupName, producerInstanceName, false);
+ }
+
+ public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
+ String producerInstanceName, boolean useTLS) {
super(topic);
this.producerGroupName = producerGroupName;
this.producerInstanceName = producerInstanceName;
this.nsAddr = nsAddr;
- create();
+ create(useTLS);
start();
}
@@ -59,17 +68,18 @@ public class RMQNormalProducer extends AbstractMQProducer {
this.producer = producer;
}
- protected void create() {
+ protected void create(boolean useTLS) {
producer = new DefaultMQProducer();
producer.setProducerGroup(getProducerGroupName());
producer.setInstanceName(getProducerInstanceName());
+ producer.setUseTLS(useTLS);
if (nsAddr != null) {
producer.setNamesrvAddr(nsAddr);
}
-
}
+
public void start() {
try {
producer.start();
@@ -83,10 +93,10 @@ public class RMQNormalProducer extends AbstractMQProducer {
public SendResult send(Object msg, Object orderKey) {
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
- Message metaqMsg = (Message) msg;
+ Message message = (Message) msg;
try {
long start = System.currentTimeMillis();
- metaqResult = producer.send(metaqMsg);
+ metaqResult = producer.send(message);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
@@ -94,9 +104,9 @@ public class RMQNormalProducer extends AbstractMQProducer {
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
- msgBodys.addData(new String(metaqMsg.getBody()));
+ msgBodys.addData(new String(message.getBody()));
originMsgs.addData(msg);
- originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult);
+ originMsgIndex.put(new String(message.getBody()), metaqResult);
} catch (Exception e) {
if (isDebug) {
e.printStackTrace();
diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
index 22193bb4ba9cd6ffc97747195740264990dcc0dc..5681ecc841aedaf92831fdf84fbbc265c54eb3e8 100644
--- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
@@ -69,8 +69,8 @@ public abstract class AbstractMQConsumer implements MQConsumer {
return listener;
}
- public void setListener(AbstractListener listener) {
- this.listener = listener;
+ public void setListener(AbstractListener listner) {
+ this.listener = listner;
}
public String getNsAddr() {
diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java
index aaa4b27c32347169eccf886dd87af97bef0dbcfe..0fc2e963d01ab3d1d162f2e9086be10f6fc9b3d8 100644
--- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.test.clientinterface;
public interface MQConsumer {
void create();
+ void create(boolean useTLS);
+
void start();
void shutdown();
diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index 7dd747f8ab745fad1ed28b20d7295e9acb76401d..0f94f3606c547c9b5f4c3cf529ffb4e63f208cb0 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -27,10 +27,16 @@ public class ConsumerFactory {
public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
- AbstractListener listner) {
+ AbstractListener listener) {
+ return getRMQNormalConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
+ }
+
+ public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup,
+ String topic, String subExpression,
+ AbstractListener listener, boolean useTLS) {
RMQNormalConsumer consumer = new RMQNormalConsumer(nsAddr, topic, subExpression,
- consumerGroup, listner);
- consumer.create();
+ consumerGroup, listener);
+ consumer.create(useTLS);
consumer.start();
return consumer;
}
diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
similarity index 85%
rename from test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java
rename to test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
index 679f8bd5d937824d9f668e8df9be6c56041c5c80..07de524a427797074d93b077c4fc95b40880d161 100644
--- a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java
+++ b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
@@ -25,24 +25,24 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.listener.AbstractListener;
-public class RMQNormalListner extends AbstractListener implements MessageListenerConcurrently {
+public class RMQNormalListener extends AbstractListener implements MessageListenerConcurrently {
private ConsumeConcurrentlyStatus consumeStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
private AtomicInteger msgIndex = new AtomicInteger(0);
- public RMQNormalListner() {
+ public RMQNormalListener() {
super();
}
- public RMQNormalListner(String listnerName) {
- super(listnerName);
+ public RMQNormalListener(String listenerName) {
+ super(listenerName);
}
- public RMQNormalListner(ConsumeConcurrentlyStatus consumeStatus) {
+ public RMQNormalListener(ConsumeConcurrentlyStatus consumeStatus) {
super();
this.consumeStatus = consumeStatus;
}
- public RMQNormalListner(String originMsgCollector, String msgBodyCollector) {
+ public RMQNormalListener(String originMsgCollector, String msgBodyCollector) {
super(originMsgCollector, msgBodyCollector);
}
@@ -51,7 +51,7 @@ public class RMQNormalListner extends AbstractListener implements MessageListene
for (MessageExt msg : msgs) {
msgIndex.getAndIncrement();
if (isDebug) {
- if (listenerName != null && listenerName != "") {
+ if (listenerName != null && !listenerName.isEmpty()) {
logger.info(listenerName + ":" + msgIndex.get() + ":"
+ String.format("msgid:%s broker:%s queueId:%s offset:%s",
msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(),
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 8516779e4363149ac5d206113c553297978f60d4..5027a3cce070397c1d49a51891ef6b9b16538fed 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -31,8 +31,6 @@ import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
-import org.apache.rocketmq.test.util.TestUtils;
-import org.junit.Assert;
public class BaseConf {
protected static String nsAddr;
@@ -82,7 +80,11 @@ public class BaseConf {
}
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
- RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic);
+ return getProducer(nsAddr, topic, false);
+ }
+
+ public static RMQNormalProducer getProducer(String nsAddr, String topic, boolean useTLS) {
+ RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, useTLS);
if (debug) {
producer.setDebug();
}
@@ -111,15 +113,25 @@ public class BaseConf {
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
- AbstractListener listner) {
+ AbstractListener listener) {
+ return getConsumer(nsAddr, topic, subExpression, listener, false);
+ }
+
+ public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
+ AbstractListener listener, boolean useTLS) {
String consumerGroup = initConsumerGroup();
- return getConsumer(nsAddr, consumerGroup, topic, subExpression, listner);
+ return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS);
+ }
+
+ public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
+ String subExpression, AbstractListener listener) {
+ return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
- String subExpression, AbstractListener listner) {
+ String subExpression, AbstractListener listener, boolean useTLS) {
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
- topic, subExpression, listner);
+ topic, subExpression, listener, useTLS);
if (debug) {
consumer.setDebug();
}
@@ -129,7 +141,7 @@ public class BaseConf {
return consumer;
}
- public static void shutDown() {
+ public static void shutdown() {
try {
for (Object mqClient : mqClients) {
if (mqClient instanceof AbstractMQProducer) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
index 835f746fd54d88d9d000978ac4418bbaf1097373..ef126213fce4a04d5b9522e0f4664d604e03f843 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -45,15 +45,15 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
public void testTwoConsumerAndCrashOne() {
int msgSize = 400;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
producer.send(msgSize);
@@ -79,11 +79,11 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
@Test
public void test3ConsumerAndCrashOne() {
int msgSize = 400;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
producer.send(msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java
index 9c9b25498acfba114d3ca1dfa2d9c82fa6d973f0..4c31f6a6cef7fb55c1db6b7a3a6520be1f56974e 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
@@ -46,15 +46,15 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
public void testTwoConsumersBalance() {
int msgSize = 400;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
@@ -75,13 +75,13 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
@Test
public void testFourConsumersBalance() {
int msgSize = 600;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java
index 41408b82e9ea59a1315bf6a4d7a66ec511dfecbc..282a2487018556e15e060d0b5729b52fa7eaf377 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
@@ -46,7 +46,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -55,7 +55,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
- new RMQNormalListner(group + "_1"));
+ new RMQNormalListener(group + "_1"));
Thread.sleep(3000);
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
@@ -66,7 +66,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
.containsExactlyElementsIn(producer.getAllMsgBody());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
- consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
+ consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), waitTime);
assertThat(consumer2.getListener().getAllMsgBody().size()).isEqualTo(0);
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java
index fc0cfce8d15bb8a3da142c82e7760fda134a4f63..977e61fb022b7c4dcc3cfdc86980e45153b54292 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -56,9 +56,9 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
- new RMQNormalListner(group + "_1"));
+ new RMQNormalListener(group + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
- consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
+ consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java
index 0d3f6fbba5ce3efc9517d69c6d7d03bab3a578b2..1178b7b8c84811ba85ee66411209cd40b5406d4d 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java
@@ -22,7 +22,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -55,10 +55,10 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT {
int msgSize = 16;
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*",
- new RMQNormalListner());
+ new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*",
- new RMQNormalListner(ConsumeConcurrentlyStatus.RECONSUME_LATER));
+ new RMQNormalListener(ConsumeConcurrentlyStatus.RECONSUME_LATER));
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java
index dd1db0b043e580fea7a32dc7a7e3e77f4f407614..987b4b29f8f7e6a959a570b40e5fe52720bde391 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -56,7 +56,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
- new RMQNormalListner(group + "_1"));
+ new RMQNormalListener(group + "_1"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
@@ -71,7 +71,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
consumer1.clearMsg();
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
- consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
+ consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java
index 4fb5005f3af9c454f80303bf7a73d7a5a125790d..019bcbb1ab0c16341ae4ee29842b01fd51f1498b 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -47,7 +47,7 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -57,9 +57,9 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT {
String group1 = initConsumerGroup();
String group2 = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group1, topic, "*",
- new RMQNormalListner(group1 + "_1"));
+ new RMQNormalListener(group1 + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, group2, topic, "*",
- new RMQNormalListner(group2 + "_2"));
+ new RMQNormalListener(group2 + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java
index 76e69b3777ed64ac80464be1e298ae79e2731728..4c9e491e248252437f86e505a51a78ac2291f359 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -47,7 +47,7 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -56,9 +56,9 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
- new RMQNormalListner(group + "_1"));
+ new RMQNormalListener(group + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
- consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
+ consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
index 9d8aeb3313ec20eb71a2a6b556318673134f79dd..9294c3fd93411e0226c4e6a09fe2f17a33dabd1c 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
@@ -49,7 +49,7 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java
index 0d3b1ca1e9f83adacebb29ce174bc7bae2aa886a..a83270b94a66a79e03452f7ced31dde0a5d52580 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -55,9 +55,9 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT {
String tag2 = "jueyin_tag_2";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag1,
- new RMQNormalListner());
+ new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
- consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListner());
+ consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag2, msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java
index 155a0aa00d22c3de1d618323e09e8ac8c388810c..9ffe05c3c3df765c61d559aaf21bda66e615c6c1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -54,9 +54,9 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT {
String tag = "jueyin_tag";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*",
- new RMQNormalListner());
+ new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
- consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner());
+ consumer1.getConsumerGroup(), topic, tag, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag, msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java
index e89464f7eabca24458d5f17b3331b6491701a81e..1c86a26b6eab4ddee5cb2508ac4c1f3626630016 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -54,9 +54,9 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT {
String tag = "jueyin_tag";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag,
- new RMQNormalListner());
+ new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
- consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner());
+ consumer1.getConsumerGroup(), topic, tag, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag, msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java
index 23248e3daf538b9289114f9f9b4c8f4e86711dc3..972666afbd6ec726065f29ea4ae1e3921ea07d09 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
@@ -46,20 +46,20 @@ public class DynamicAddAndCrashIT extends BaseConf {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
public void testAddOneConsumerAndCrashAfterWhile() {
int msgSize = 150;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
consumer2.shutdown();
@@ -76,16 +76,16 @@ public class DynamicAddAndCrashIT extends BaseConf {
@Test
public void testAddTwoConsumerAndCrashAfterWhile() {
int msgSize = 150;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
consumer2.shutdown();
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java
index 9ef79534d251280830897723bfe210e7a6598bb3..279ee8c3a1fe0606a0f371593c4c9201e49a1b68 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
@@ -46,20 +46,20 @@ public class DynamicAddConsumerIT extends BaseConf {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
public void testAddOneConsumer() {
int msgSize = 100;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
asyncDefaultMQProducer.waitSendAll(waitTime * 6);
@@ -74,16 +74,16 @@ public class DynamicAddConsumerIT extends BaseConf {
@Test
public void testAddTwoConsumer() {
int msgSize = 100;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
asyncDefaultMQProducer.waitSendAll(waitTime * 6);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java
index 1323720664b4500f379814c2e78fe2f5b90acdc7..68d9198cae1a3a35c6795db32928e697c097b5f0 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
@@ -46,15 +46,15 @@ public class DynamicCrashConsumerIT extends BaseConf {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
public void testAddOneConsumer() {
int msgSize = 100;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
@@ -75,11 +75,11 @@ public class DynamicCrashConsumerIT extends BaseConf {
@Test
public void testAddTwoConsumer() {
int msgSize = 100;
- RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
+ RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
- "*", new RMQNormalListner());
+ "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
index 15d91a1423ce14cb6fe6c48a7b406c09e1ceb3a3..79f15dcc99da9ea04b20c756ffc12ddc03199b53 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
@@ -49,7 +49,7 @@ public class SqlFilterIT extends BaseConf {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -58,7 +58,7 @@ public class SqlFilterIT extends BaseConf {
String group = initConsumerGroup();
MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
- RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListner(group + "_1"));
+ RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListener(group + "_1"));
Thread.sleep(3000);
producer.send("TagA", msgSize);
producer.send("TagB", msgSize);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java
index 37ccb4d204aeebd58c8d62464d349b288c432281..564da5ceaf1cf701ea91639f6fb780de126bc76b 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.factory.TagMessage;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
@@ -49,7 +49,7 @@ public class MulTagSubIT extends BaseConf {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
@@ -58,7 +58,7 @@ public class MulTagSubIT extends BaseConf {
String subExpress = String.format("%s||jueyin2", tag);
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
@@ -75,7 +75,7 @@ public class MulTagSubIT extends BaseConf {
String subExpress = String.format("%s||noExistTag", tag2);
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(tag1, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
@@ -98,7 +98,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
@@ -119,7 +119,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
@@ -141,7 +141,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java
index 1952f3056e91c2a0702ea77c1a277987e2f7cff1..423baae8e6240c6904a0fa409b06d46b8cf7f9ec 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
@@ -47,14 +47,14 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
@After
public void tearDown() {
- super.shutDown();
+ super.shutdown();
}
@Test
public void testTagSmoke() {
String tag = "jueyin";
int msgSize = 10;
- RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListner());
+ RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
@@ -68,7 +68,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExprress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExprress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
@@ -84,7 +84,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
@@ -100,7 +100,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
@@ -116,7 +116,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = null;
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
@@ -133,7 +133,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
- new RMQNormalListner());
+ new RMQNormalListener());
List