From a1764ab01b613bb49dd80bed7cd43bb850d183cb Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Thu, 12 Oct 2017 17:32:07 +0800 Subject: [PATCH] [ROCKETMQ-28] Encrypt transmission layer closes apache/rocketmq#118 --- .../apache/rocketmq/broker/BrokerStartup.java | 2 + client/pom.xml | 6 + .../apache/rocketmq/client/ClientConfig.java | 14 +- .../client/impl/factory/MQClientInstance.java | 1 + distribution/bin/runbroker.sh | 2 +- distribution/bin/runserver.sh | 2 +- pom.xml | 2 +- remoting/pom.xml | 17 +++ .../rocketmq/remoting/RemotingClient.java | 14 +- .../rocketmq/remoting/common/SslMode.java | 53 +++++++ .../remoting/netty/FileRegionEncoder.java | 78 ++++++++++ .../remoting/netty/NettyClientConfig.java | 10 ++ .../remoting/netty/NettyRemotingAbstract.java | 7 + .../remoting/netty/NettyRemotingClient.java | 41 +++++- .../remoting/netty/NettyRemotingServer.java | 102 +++++++++++++- .../remoting/netty/NettySystemConfig.java | 30 +++- .../rocketmq/remoting/netty/SslHelper.java | 133 ++++++++++++++++++ .../remoting/netty/FileRegionEncoderTest.java | 80 +++++++++++ .../test/client/rmq/RMQNormalConsumer.java | 9 +- .../test/client/rmq/RMQNormalProducer.java | 26 ++-- .../clientinterface/AbstractMQConsumer.java | 4 +- .../test/clientinterface/MQConsumer.java | 2 + .../test/factory/ConsumerFactory.java | 12 +- ...malListner.java => RMQNormalListener.java} | 14 +- .../apache/rocketmq/test/base/BaseConf.java | 28 ++-- .../balance/NormalMsgDynamicBalanceIT.java | 14 +- .../balance/NormalMsgStaticBalanceIT.java | 16 +-- .../normal/BroadCastNormalMsgNotRecvIT.java | 8 +- .../normal/BroadCastNormalMsgRecvCrashIT.java | 8 +- .../normal/BroadCastNormalMsgRecvFailIT.java | 8 +- .../BroadCastNormalMsgRecvStartLaterIT.java | 8 +- .../BroadCastNormalMsgTwoDiffGroupRecvIT.java | 8 +- .../NormalMsgTwoSameGroupConsumerIT.java | 8 +- .../broadcast/order/OrderMsgBroadCastIT.java | 2 +- .../tag/BroadCastTwoConsumerFilterIT.java | 8 +- .../tag/BroadCastTwoConsumerSubDiffTagIT.java | 8 +- .../tag/BroadCastTwoConsumerSubTagIT.java | 8 +- .../cluster/DynamicAddAndCrashIT.java | 14 +- .../cluster/DynamicAddConsumerIT.java | 14 +- .../cluster/DynamicCrashConsumerIT.java | 14 +- .../client/consumer/filter/SqlFilterIT.java | 6 +- .../test/client/consumer/tag/MulTagSubIT.java | 14 +- .../tag/TagMessageWith1ConsumerIT.java | 20 +-- .../tag/TagMessageWithMulConsumerIT.java | 24 ++-- .../TagMessageWithSameGroupConsumerIT.java | 16 +-- .../consumer/topic/MulConsumerMulTopicIT.java | 16 +-- .../consumer/topic/OneConsumerMulTopicIT.java | 10 +- .../producer/async/AsyncSendExceptionIT.java | 2 +- .../async/AsyncSendWithMessageQueueIT.java | 6 +- .../AsyncSendWithMessageQueueSelectorIT.java | 6 +- .../AsyncSendWithOnlySendCallBackIT.java | 6 +- .../client/producer/batch/BatchSendIT.java | 2 +- .../exception/msg/MessageUserPropIT.java | 8 +- ...roducerGroupAndInstanceNameValidityIT.java | 2 +- .../oneway/OneWaySendExceptionIT.java | 2 +- .../client/producer/oneway/OneWaySendIT.java | 6 +- .../producer/oneway/OneWaySendWithMQIT.java | 6 +- .../oneway/OneWaySendWithSelectorIT.java | 6 +- .../order/OrderMsgDynamicRebalanceIT.java | 2 +- .../client/producer/order/OrderMsgIT.java | 2 +- .../producer/order/OrderMsgRebalanceIT.java | 2 +- .../producer/order/OrderMsgWithTagIT.java | 2 +- .../querymsg/QueryMsgByIdExceptionIT.java | 2 +- .../producer/querymsg/QueryMsgByIdIT.java | 6 +- .../producer/querymsg/QueryMsgByKeyIT.java | 2 +- .../rocketmq/test/delay/NormalMsgDelayIT.java | 2 +- .../smoke/NormalMessageSendAndRecvIT.java | 6 +- .../org/apache/rocketmq/test/tls/TLS_IT.java | 60 ++++++++ .../apache/rocketmq/test/tls/TLS_Mix2_IT.java | 61 ++++++++ .../apache/rocketmq/test/tls/TLS_Mix_IT.java | 62 ++++++++ 70 files changed, 978 insertions(+), 224 deletions(-) create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java rename test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/{RMQNormalListner.java => RMQNormalListener.java} (85%) create mode 100644 test/src/test/java/org/apache/rocketmq/test/tls/TLS_IT.java create mode 100644 test/src/test/java/org/apache/rocketmq/test/tls/TLS_Mix2_IT.java create mode 100644 test/src/test/java/org/apache/rocketmq/test/tls/TLS_Mix_IT.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 94ebe4f1..e9237b67 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.common.SslMode; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettySystemConfig; @@ -97,6 +98,7 @@ public class BrokerStartup { final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); + nettyClientConfig.setUseTLS(NettySystemConfig.sslMode != SslMode.DISABLED); nettyServerConfig.setListenPort(10911); final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); diff --git a/client/pom.xml b/client/pom.xml index 2f3fb133..3f5c9617 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -36,6 +36,12 @@ ${project.groupId} rocketmq-common + + + io.netty + netty-tcnative + + org.slf4j diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 950d7562..8f255f01 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -45,6 +45,8 @@ public class ClientConfig { private String unitName; private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); + private boolean useTLS; + public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); @@ -92,6 +94,7 @@ public class ClientConfig { this.unitMode = cc.unitMode; this.unitName = cc.unitName; this.vipChannelEnabled = cc.vipChannelEnabled; + this.useTLS = cc.useTLS; } public ClientConfig cloneClientConfig() { @@ -106,6 +109,7 @@ public class ClientConfig { cc.unitMode = unitMode; cc.unitName = unitName; cc.vipChannelEnabled = vipChannelEnabled; + cc.useTLS = useTLS; return cc; } @@ -173,12 +177,20 @@ public class ClientConfig { this.vipChannelEnabled = vipChannelEnabled; } + public boolean isUseTLS() { + return useTLS; + } + + public void setUseTLS(boolean useTLS) { + this.useTLS = useTLS; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" - + vipChannelEnabled + "]"; + + vipChannelEnabled + ", useTLS=" + useTLS + "]"; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 31c2c3cc..365fca9f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -127,6 +127,7 @@ public class MQClientInstance { this.instanceIndex = instanceIndex; this.nettyClientConfig = new NettyClientConfig(); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); + this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); this.clientRemotingProcessor = new ClientRemotingProcessor(this); this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh index b0c490e9..74a28d15 100644 --- a/distribution/bin/runbroker.sh +++ b/distribution/bin/runbroker.sh @@ -44,7 +44,7 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" -JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib" +JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" diff --git a/distribution/bin/runserver.sh b/distribution/bin/runserver.sh index 7c41b0c2..cc4c8d7e 100644 --- a/distribution/bin/runserver.sh +++ b/distribution/bin/runserver.sh @@ -41,7 +41,7 @@ JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" -JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib" +JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" diff --git a/pom.xml b/pom.xml index ab90868e..e317472c 100644 --- a/pom.xml +++ b/pom.xml @@ -550,7 +550,7 @@ io.netty netty-all - 4.0.36.Final + 4.0.42.Final com.alibaba diff --git a/remoting/pom.xml b/remoting/pom.xml index 05b4274d..61f0286a 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -45,5 +45,22 @@ org.slf4j slf4j-api + + + io.netty + netty-tcnative + 1.1.33.Fork22 + ${os.detected.classifier} + + + + + + kr.motd.maven + os-maven-plugin + 1.4.0.Final + + + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index 276a5653..b527408e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -27,24 +27,24 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface RemotingClient extends RemotingService { - public void updateNameServerAddressList(final List addrs); + void updateNameServerAddressList(final List addrs); - public List getNameServerAddressList(); + List getNameServerAddressList(); - public RemotingCommand invokeSync(final String addr, final RemotingCommand request, + RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException; - public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, + void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; - public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) + void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; - public void registerProcessor(final int requestCode, final NettyRequestProcessor processor, + void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); - public boolean isChannelWriteable(final String addr); + boolean isChannelWritable(final String addr); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java new file mode 100644 index 00000000..8801736b --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.common; + +/** + * For server, three SSL modes are supported: disabled, permissive and enforcing. + *
    + *
  1. disable: SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.
  2. + *
  3. permissive: SSL is optional, aka, server in this mode can serve client connections with or without SSL;
  4. + *
  5. enforcing: SSL is required, aka, non SSL connection will be rejected.
  6. + *
+ */ +public enum SslMode { + + DISABLED("disabled"), + PERMISSIVE("permissive"), + ENFORCING("enforcing"); + + private String name; + + SslMode(String name) { + this.name = name; + } + + public static SslMode parse(String mode) { + for (SslMode sslMode: SslMode.values()) { + if (sslMode.name.equals(mode)) { + return sslMode; + } + } + + return PERMISSIVE; + } + + public String getName() { + return name; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java new file mode 100644 index 00000000..2bd15aea --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; +import io.netty.handler.codec.MessageToByteEncoder; + +import io.netty.handler.ssl.SslHandler; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +/** + *

+ * By default, file region are directly transferred to socket channel which is known as zero copy. In case we need + * to encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this + * process. + *

+ */ +public class FileRegionEncoder extends MessageToByteEncoder { + + /** + * Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that + * can be handled by this encoder. + * + * @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link + * io.netty.handler.codec.MessageToByteEncoder} belongs to + * @param msg the message to encode + * @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written + * @throws Exception is thrown if an error occurs + */ + @Override + protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception { + WritableByteChannel writableByteChannel = new WritableByteChannel() { + @Override + public int write(ByteBuffer src) throws IOException { + out.writeBytes(src); + return out.capacity(); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() throws IOException { + } + }; + + long toTransfer = msg.count(); + + while (true) { + long transferred = msg.transfered(); + if (toTransfer - transferred <= 0) { + break; + } + msg.transferTo(writableByteChannel, transferred); + } + } +} \ No newline at end of file diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java index 9edaa542..fbc071b2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java @@ -38,6 +38,8 @@ public class NettyClientConfig { private boolean clientPooledByteBufAllocatorEnable = false; private boolean clientCloseSocketIfTimeout = false; + private boolean useTLS; + public boolean isClientCloseSocketIfTimeout() { return clientCloseSocketIfTimeout; } @@ -125,4 +127,12 @@ public class NettyClientConfig { public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) { this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable; } + + public boolean isUseTLS() { + return useTLS; + } + + public void setUseTLS(boolean useTLS) { + this.useTLS = useTLS; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 61434628..06918086 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -20,6 +20,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; import java.net.SocketAddress; import java.util.HashMap; import java.util.Iterator; @@ -88,6 +90,11 @@ public abstract class NettyRemotingAbstract { */ protected Pair defaultRequestProcessor; + /** + * SSL context via which to create {@link SslHandler}. + */ + protected SslContext sslContext; + /** * Constructor, specifying capacity of one-way and asynchronous semaphores. * diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 34f560fd..a96423c1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; @@ -34,6 +35,7 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.net.SocketAddress; +import java.security.cert.CertificateException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import javax.net.ssl.SSLException; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -120,6 +123,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); } }); + + if (nettyClientConfig.isUseTLS()) { + try { + sslContext = SslHelper.buildSslContext(true); + log.info("SSL enabled for client"); + } catch (SSLException e) { + log.error("Failed to create SSLContext", e); + } catch (CertificateException e) { + log.error("Failed to create SSLContext", e); + throw new RuntimeException("Failed to create SSLContext", e); + } + } } private static int initValueIndex() { @@ -151,7 +166,16 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( + ChannelPipeline pipeline = ch.pipeline(); + if (nettyClientConfig.isUseTLS()) { + if (null != sslContext) { + pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); + log.info("Prepend SSL handler"); + } else { + log.warn("Connections are insecure as SSLContext is null!"); + } + } + pipeline.addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), @@ -421,17 +445,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private Channel createChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + cw.getChannel().close(); + channelTables.remove(addr); } if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - boolean createNewConnection = false; + boolean createNewConnection; cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { - return cw.getChannel(); + cw.getChannel().close(); + this.channelTables.remove(addr); + createNewConnection = true; } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { @@ -530,10 +557,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } @Override - public boolean isChannelWriteable(String addr) { + public boolean isChannelWritable(String addr) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.isWriteable(); + return cw.isWritable(); } return true; } @@ -569,7 +596,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return this.channelFuture.channel() != null && this.channelFuture.channel().isActive(); } - public boolean isWriteable() { + public boolean isWritable() { return this.channelFuture.channel().isWritable(); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 7cf82c96..ec1927a6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.netty; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -37,12 +38,15 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.net.InetSocketAddress; +import java.security.cert.CertificateException; +import java.util.NoSuchElementException; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import javax.net.ssl.SSLException; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -50,6 +54,7 @@ import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.common.SslMode; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; @@ -74,6 +79,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private int port = 0; + private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; + private static final String TLS_HANDLER_NAME = "sslHandler"; + private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; + public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { this(nettyServerConfig, null); } @@ -129,6 +138,20 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } }); } + + SslMode sslMode = NettySystemConfig.sslMode; + log.info("Server is running in TLS {} mode", sslMode.getName()); + + if (sslMode != SslMode.DISABLED) { + try { + sslContext = SslHelper.buildSslContext(false); + log.info("SSLContext created for server"); + } catch (CertificateException e) { + log.error("Failed to create SSLContext for server", e); + } catch (SSLException e) { + log.error("Failed to create SSLContext for server", e); + } + } } private boolean useEpoll() { @@ -164,13 +187,16 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - defaultEventExecutorGroup, - new NettyEncoder(), - new NettyDecoder(), - new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - new NettyConnectManageHandler(), - new NettyServerHandler()); + ch.pipeline() + .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, + new HandshakeHandler(NettySystemConfig.sslMode)) + .addLast(defaultEventExecutorGroup, + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + new NettyConnectManageHandler(), + new NettyServerHandler() + ); } }); @@ -298,6 +324,68 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return this.publicExecutor; } + class HandshakeHandler extends SimpleChannelInboundHandler { + + private final SslMode sslMode; + + private static final byte HANDSHAKE_MAGIC_CODE = 0x16; + + HandshakeHandler(SslMode sslMode) { + this.sslMode = sslMode; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + + // mark the current position so that we can peek the first byte to determine if the content is starting with + // TLS handshake + msg.markReaderIndex(); + + byte b = msg.getByte(0); + + if (b == HANDSHAKE_MAGIC_CODE) { + switch (sslMode) { + case DISABLED: + ctx.close(); + log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode"); + break; + case PERMISSIVE: + case ENFORCING: + if (null != sslContext) { + ctx.pipeline() + .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) + .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); + log.info("Handlers prepended to channel pipeline to establish SSL connection"); + } else { + ctx.close(); + log.error("Trying to establish a SSL connection but sslContext is null"); + } + break; + + default: + log.warn("Unknown TLS mode"); + break; + } + } else if (sslMode == SslMode.ENFORCING) { + ctx.close(); + log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode"); + } + + // reset the reader index so that handshake negotiation may proceed as normal. + msg.resetReaderIndex(); + + try { + // Remove this handler + ctx.pipeline().remove(this); + } catch (NoSuchElementException e) { + log.error("Error while removing HandshakeHandler", e); + } + + // Hand over this message to the next . + ctx.fireChannelRead(msg.retain()); + } + } + class NettyServerHandler extends SimpleChannelInboundHandler { @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java index 2e0a81e8..3300262c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.remoting.netty; +import org.apache.rocketmq.remoting.common.SslMode; + public class NettySystemConfig { public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable"; @@ -28,10 +30,16 @@ public class NettySystemConfig { "com.rocketmq.remoting.clientAsyncSemaphoreValue"; public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = "com.rocketmq.remoting.clientOnewaySemaphoreValue"; - public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = - Boolean - .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); - public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = + + public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE = // + "org.apache.rocketmq.remoting.ssl.mode"; + + public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE = // + "org.apache.rocketmq.remoting.ssl.config.file"; + + public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // + Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); + public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535")); @@ -39,4 +47,18 @@ public class NettySystemConfig { Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); public static int socketRcvbufSize = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); + + /** + * For server, three SSL modes are supported: disabled, permissive and enforcing. + *
    + *
  1. disable: SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.
  2. + *
  3. permissive: SSL is optional, aka, server in this mode can serve client connections with or without SSL;
  4. + *
  5. enforcing: SSL is required, aka, non SSL connection will be rejected.
  6. + *
+ */ + public static SslMode sslMode = // + SslMode.parse(System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE, "permissive")); + + public static String sslConfigFile = // + System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE, "/etc/rocketmq/ssl.properties"); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java new file mode 100644 index 00000000..ebadd968 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.cert.CertificateException; +import java.util.Properties; +import javax.net.ssl.SSLException; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SslHelper { + + private static final Logger LOGGER = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + + public static SslContext buildSslContext(boolean forClient) throws SSLException, CertificateException { + + File configFile = new File(NettySystemConfig.sslConfigFile); + boolean testMode = !(configFile.exists() && configFile.isFile() && configFile.canRead()); + Properties properties = null; + + if (!testMode) { + properties = new Properties(); + InputStream inputStream = null; + try { + inputStream = new FileInputStream(configFile); + properties.load(inputStream); + } catch (FileNotFoundException ignore) { + } catch (IOException ignore) { + } finally { + if (null != inputStream) { + try { + inputStream.close(); + } catch (IOException ignore) { + } + } + } + } + + SslProvider provider = null; + if (OpenSsl.isAvailable()) { + provider = SslProvider.OPENSSL; + LOGGER.info("Using OpenSSL provider"); + } else { + provider = SslProvider.JDK; + LOGGER.info("Using JDK SSL provider"); + } + + if (forClient) { + if (testMode) { + return SslContextBuilder + .forClient() + .sslProvider(SslProvider.JDK) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + } else { + SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK); + + if ("false".equals(properties.getProperty("client.auth.server"))) { + sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); + } else { + if (properties.containsKey("client.trustManager")) { + sslContextBuilder.trustManager(new File(properties.getProperty("client.trustManager"))); + } + } + + return sslContextBuilder.keyManager( + properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null, + properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.keyFile")) : null, + properties.containsKey("client.password") ? properties.getProperty("client.password") : null) + .build(); + } + } else { + + if (testMode) { + SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); + return SslContextBuilder + .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()) + .sslProvider(SslProvider.JDK) + .clientAuth(ClientAuth.OPTIONAL) + .build(); + } else { + return SslContextBuilder.forServer( + properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null, + properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.keyFile")) : null, + properties.containsKey("server.password") ? properties.getProperty("server.password") : null) + .sslProvider(provider) + .trustManager(new File(properties.getProperty("server.trustManager"))) + .clientAuth(parseClientAuthMode(properties.getProperty("server.auth.client"))) + .build(); + } + } + } + + private static ClientAuth parseClientAuthMode(String authMode) { + if (null == authMode || authMode.trim().isEmpty()) { + return ClientAuth.NONE; + } + + if ("optional".equalsIgnoreCase(authMode)) { + return ClientAuth.OPTIONAL; + } + + return ClientAuth.REQUIRE; + } +} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java new file mode 100644 index 00000000..6c7327f2 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.FileRegion; +import io.netty.channel.embedded.EmbeddedChannel; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Random; +import java.util.UUID; +import org.junit.Assert; +import org.junit.Test; + +public class FileRegionEncoderTest { + + /** + * This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to + * {@link ByteBuf}. + * @throws IOException if there is an error. + */ + @Test + public void testEncode() throws IOException { + FileRegionEncoder fileRegionEncoder = new FileRegionEncoder(); + EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder); + File file = File.createTempFile(UUID.randomUUID().toString(), ".data"); + file.deleteOnExit(); + Random random = new Random(System.currentTimeMillis()); + int dataLength = 1 << 10; + byte[] data = new byte[dataLength]; + random.nextBytes(data); + write(file, data); + FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength); + Assert.assertEquals(0, fileRegion.transfered()); + Assert.assertEquals(dataLength, fileRegion.count()); + Assert.assertTrue(channel.writeOutbound(fileRegion)); + ByteBuf out = (ByteBuf) channel.readOutbound(); + byte[] arr = new byte[out.readableBytes()]; + out.getBytes(0, arr); + Assert.assertArrayEquals("Data should be identical", data, arr); + } + + /** + * Write byte array to the specified file. + * + * @param file File to write to. + * @param data byte array to write. + * @throws IOException in case there is an exception. + */ + private static void write(File file, byte[] data) throws IOException { + BufferedOutputStream bufferedOutputStream = null; + try { + bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file, false)); + bufferedOutputStream.write(data); + bufferedOutputStream.flush(); + } finally { + if (null != bufferedOutputStream) { + bufferedOutputStream.close(); + } + } + } +} \ No newline at end of file diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java index 7225cd6b..ce739be5 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java @@ -29,8 +29,8 @@ public class RMQNormalConsumer extends AbstractMQConsumer { protected DefaultMQPushConsumer consumer = null; public RMQNormalConsumer(String nsAddr, String topic, String subExpression, - String consumerGroup, AbstractListener listner) { - super(nsAddr, topic, subExpression, consumerGroup, listner); + String consumerGroup, AbstractListener listener) { + super(nsAddr, topic, subExpression, consumerGroup, listener); } public AbstractListener getListener() { @@ -42,6 +42,10 @@ public class RMQNormalConsumer extends AbstractMQConsumer { } public void create() { + create(false); + } + + public void create(boolean useTLS) { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setInstanceName(RandomUtil.getStringByUUID()); consumer.setNamesrvAddr(nsAddr); @@ -52,6 +56,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer { e.printStackTrace(); } consumer.setMessageListener(listener); + consumer.setUseTLS(useTLS); } public void start() { diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java index 26b77fe8..42949339 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java @@ -34,20 +34,29 @@ public class RMQNormalProducer extends AbstractMQProducer { private String nsAddr = null; public RMQNormalProducer(String nsAddr, String topic) { + this(nsAddr, topic, false); + } + + public RMQNormalProducer(String nsAddr, String topic, boolean useTLS) { super(topic); this.nsAddr = nsAddr; - create(); + create(useTLS); start(); } public RMQNormalProducer(String nsAddr, String topic, String producerGroupName, String producerInstanceName) { + this(nsAddr, topic, producerGroupName, producerInstanceName, false); + } + + public RMQNormalProducer(String nsAddr, String topic, String producerGroupName, + String producerInstanceName, boolean useTLS) { super(topic); this.producerGroupName = producerGroupName; this.producerInstanceName = producerInstanceName; this.nsAddr = nsAddr; - create(); + create(useTLS); start(); } @@ -59,17 +68,18 @@ public class RMQNormalProducer extends AbstractMQProducer { this.producer = producer; } - protected void create() { + protected void create(boolean useTLS) { producer = new DefaultMQProducer(); producer.setProducerGroup(getProducerGroupName()); producer.setInstanceName(getProducerInstanceName()); + producer.setUseTLS(useTLS); if (nsAddr != null) { producer.setNamesrvAddr(nsAddr); } - } + public void start() { try { producer.start(); @@ -83,10 +93,10 @@ public class RMQNormalProducer extends AbstractMQProducer { public SendResult send(Object msg, Object orderKey) { org.apache.rocketmq.client.producer.SendResult metaqResult = null; - Message metaqMsg = (Message) msg; + Message message = (Message) msg; try { long start = System.currentTimeMillis(); - metaqResult = producer.send(metaqMsg); + metaqResult = producer.send(message); this.msgRTs.addData(System.currentTimeMillis() - start); if (isDebug) { logger.info(metaqResult); @@ -94,9 +104,9 @@ public class RMQNormalProducer extends AbstractMQProducer { sendResult.setMsgId(metaqResult.getMsgId()); sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK)); sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName()); - msgBodys.addData(new String(metaqMsg.getBody())); + msgBodys.addData(new String(message.getBody())); originMsgs.addData(msg); - originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult); + originMsgIndex.put(new String(message.getBody()), metaqResult); } catch (Exception e) { if (isDebug) { e.printStackTrace(); diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java index 22193bb4..5681ecc8 100644 --- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java @@ -69,8 +69,8 @@ public abstract class AbstractMQConsumer implements MQConsumer { return listener; } - public void setListener(AbstractListener listener) { - this.listener = listener; + public void setListener(AbstractListener listner) { + this.listener = listner; } public String getNsAddr() { diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java index aaa4b27c..0fc2e963 100644 --- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java @@ -20,6 +20,8 @@ package org.apache.rocketmq.test.clientinterface; public interface MQConsumer { void create(); + void create(boolean useTLS); + void start(); void shutdown(); diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java index 7dd747f8..0f94f360 100644 --- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java @@ -27,10 +27,16 @@ public class ConsumerFactory { public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup, String topic, String subExpression, - AbstractListener listner) { + AbstractListener listener) { + return getRMQNormalConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false); + } + + public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup, + String topic, String subExpression, + AbstractListener listener, boolean useTLS) { RMQNormalConsumer consumer = new RMQNormalConsumer(nsAddr, topic, subExpression, - consumerGroup, listner); - consumer.create(); + consumerGroup, listener); + consumer.create(useTLS); consumer.start(); return consumer; } diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java similarity index 85% rename from test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java rename to test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java index 679f8bd5..07de524a 100644 --- a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java +++ b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java @@ -25,24 +25,24 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.test.listener.AbstractListener; -public class RMQNormalListner extends AbstractListener implements MessageListenerConcurrently { +public class RMQNormalListener extends AbstractListener implements MessageListenerConcurrently { private ConsumeConcurrentlyStatus consumeStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; private AtomicInteger msgIndex = new AtomicInteger(0); - public RMQNormalListner() { + public RMQNormalListener() { super(); } - public RMQNormalListner(String listnerName) { - super(listnerName); + public RMQNormalListener(String listenerName) { + super(listenerName); } - public RMQNormalListner(ConsumeConcurrentlyStatus consumeStatus) { + public RMQNormalListener(ConsumeConcurrentlyStatus consumeStatus) { super(); this.consumeStatus = consumeStatus; } - public RMQNormalListner(String originMsgCollector, String msgBodyCollector) { + public RMQNormalListener(String originMsgCollector, String msgBodyCollector) { super(originMsgCollector, msgBodyCollector); } @@ -51,7 +51,7 @@ public class RMQNormalListner extends AbstractListener implements MessageListene for (MessageExt msg : msgs) { msgIndex.getAndIncrement(); if (isDebug) { - if (listenerName != null && listenerName != "") { + if (listenerName != null && !listenerName.isEmpty()) { logger.info(listenerName + ":" + msgIndex.get() + ":" + String.format("msgid:%s broker:%s queueId:%s offset:%s", msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 8516779e..5027a3cc 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -31,8 +31,6 @@ import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.listener.AbstractListener; import org.apache.rocketmq.test.util.MQAdmin; import org.apache.rocketmq.test.util.MQRandomUtils; -import org.apache.rocketmq.test.util.TestUtils; -import org.junit.Assert; public class BaseConf { protected static String nsAddr; @@ -82,7 +80,11 @@ public class BaseConf { } public static RMQNormalProducer getProducer(String nsAddr, String topic) { - RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic); + return getProducer(nsAddr, topic, false); + } + + public static RMQNormalProducer getProducer(String nsAddr, String topic, boolean useTLS) { + RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, useTLS); if (debug) { producer.setDebug(); } @@ -111,15 +113,25 @@ public class BaseConf { } public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression, - AbstractListener listner) { + AbstractListener listener) { + return getConsumer(nsAddr, topic, subExpression, listener, false); + } + + public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression, + AbstractListener listener, boolean useTLS) { String consumerGroup = initConsumerGroup(); - return getConsumer(nsAddr, consumerGroup, topic, subExpression, listner); + return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS); + } + + public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic, + String subExpression, AbstractListener listener) { + return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false); } public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic, - String subExpression, AbstractListener listner) { + String subExpression, AbstractListener listener, boolean useTLS) { RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup, - topic, subExpression, listner); + topic, subExpression, listener, useTLS); if (debug) { consumer.setDebug(); } @@ -129,7 +141,7 @@ public class BaseConf { return consumer; } - public static void shutDown() { + public static void shutdown() { try { for (Object mqClient : mqClients) { if (mqClient instanceof AbstractMQProducer) { diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java index 835f746f..ef126213 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.MQWait; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -45,15 +45,15 @@ public class NormalMsgDynamicBalanceIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testTwoConsumerAndCrashOne() { int msgSize = 400; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); producer.send(msgSize); @@ -79,11 +79,11 @@ public class NormalMsgDynamicBalanceIT extends BaseConf { @Test public void test3ConsumerAndCrashOne() { int msgSize = 400; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); producer.send(msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java index 9c9b2549..4c31f6a6 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.MQWait; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; @@ -46,15 +46,15 @@ public class NormalMsgStaticBalanceIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testTwoConsumersBalance() { int msgSize = 400; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); TestUtils.waitForSeconds(waitTime); producer.send(msgSize); @@ -75,13 +75,13 @@ public class NormalMsgStaticBalanceIT extends BaseConf { @Test public void testFourConsumersBalance() { int msgSize = 600; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); TestUtils.waitForSeconds(waitTime); producer.send(msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java index 41408b82..282a2487 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -46,7 +46,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -55,7 +55,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT { String group = initConsumerGroup(); RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*", - new RMQNormalListner(group + "_1")); + new RMQNormalListener(group + "_1")); Thread.sleep(3000); producer.send(msgSize); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); @@ -66,7 +66,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT { .containsExactlyElementsIn(producer.getAllMsgBody()); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, - consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); + consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2")); consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), waitTime); assertThat(consumer2.getListener().getAllMsgBody().size()).isEqualTo(0); } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java index fc0cfce8..977e61fb 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -56,9 +56,9 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT { String group = initConsumerGroup(); RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*", - new RMQNormalListner(group + "_1")); + new RMQNormalListener(group + "_1")); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, - consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); + consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2")); TestUtils.waitForSeconds(waitTime); producer.send(msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java index 0d3f6fbb..1178b7b8 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java @@ -22,7 +22,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -55,10 +55,10 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT { int msgSize = 16; RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*", - new RMQNormalListner()); + new RMQNormalListener()); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, consumer1.getConsumerGroup(), topic, "*", - new RMQNormalListner(ConsumeConcurrentlyStatus.RECONSUME_LATER)); + new RMQNormalListener(ConsumeConcurrentlyStatus.RECONSUME_LATER)); producer.send(msgSize); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java index dd1db0b0..987b4b29 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -56,7 +56,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT { String group = initConsumerGroup(); RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*", - new RMQNormalListner(group + "_1")); + new RMQNormalListener(group + "_1")); TestUtils.waitForSeconds(waitTime); producer.send(msgSize); @@ -71,7 +71,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT { consumer1.clearMsg(); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, - consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); + consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2")); TestUtils.waitForSeconds(waitTime); producer.send(msgSize); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java index 4fb5005f..019bcbb1 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -47,7 +47,7 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -57,9 +57,9 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT { String group1 = initConsumerGroup(); String group2 = initConsumerGroup(); RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group1, topic, "*", - new RMQNormalListner(group1 + "_1")); + new RMQNormalListener(group1 + "_1")); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, group2, topic, "*", - new RMQNormalListner(group2 + "_2")); + new RMQNormalListener(group2 + "_2")); TestUtils.waitForSeconds(waitTime); producer.send(msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java index 76e69b37..4c9e491e 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -47,7 +47,7 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -56,9 +56,9 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT { String group = initConsumerGroup(); RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*", - new RMQNormalListner(group + "_1")); + new RMQNormalListener(group + "_1")); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, - consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); + consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2")); TestUtils.waitForSeconds(waitTime); producer.send(msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java index 9d8aeb33..9294c3fd 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java @@ -49,7 +49,7 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java index 0d3b1ca1..a83270b9 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -45,7 +45,7 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -55,9 +55,9 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT { String tag2 = "jueyin_tag_2"; RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag1, - new RMQNormalListner()); + new RMQNormalListener()); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, - consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListner()); + consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListener()); TestUtils.waitForSeconds(waitTime); producer.send(tag2, msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java index 155a0aa0..9ffe05c3 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -45,7 +45,7 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -54,9 +54,9 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT { String tag = "jueyin_tag"; RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*", - new RMQNormalListner()); + new RMQNormalListener()); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, - consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner()); + consumer1.getConsumerGroup(), topic, tag, new RMQNormalListener()); TestUtils.waitForSeconds(waitTime); producer.send(tag, msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java index e89464f7..1c86a26b 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -45,7 +45,7 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -54,9 +54,9 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT { String tag = "jueyin_tag"; RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag, - new RMQNormalListner()); + new RMQNormalListener()); RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, - consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner()); + consumer1.getConsumerGroup(), topic, tag, new RMQNormalListener()); TestUtils.waitForSeconds(waitTime); producer.send(tag, msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java index 23248e3d..972666af 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT import org.apache.rocketmq.test.client.mq.MQAsyncProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.MQWait; import org.apache.rocketmq.test.util.TestUtils; import org.junit.After; @@ -46,20 +46,20 @@ public class DynamicAddAndCrashIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testAddOneConsumerAndCrashAfterWhile() { int msgSize = 150; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); asyncDefaultMQProducer.start(); TestUtils.waitForSeconds(waitTime); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); TestUtils.waitForSeconds(waitTime); consumer2.shutdown(); @@ -76,16 +76,16 @@ public class DynamicAddAndCrashIT extends BaseConf { @Test public void testAddTwoConsumerAndCrashAfterWhile() { int msgSize = 150; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); asyncDefaultMQProducer.start(); TestUtils.waitForSeconds(waitTime); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); TestUtils.waitForSeconds(waitTime); consumer2.shutdown(); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java index 9ef79534..279ee8c3 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT import org.apache.rocketmq.test.client.mq.MQAsyncProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.MQWait; import org.apache.rocketmq.test.util.TestUtils; import org.junit.After; @@ -46,20 +46,20 @@ public class DynamicAddConsumerIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testAddOneConsumer() { int msgSize = 100; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); asyncDefaultMQProducer.start(); TestUtils.waitForSeconds(waitTime); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); asyncDefaultMQProducer.waitSendAll(waitTime * 6); @@ -74,16 +74,16 @@ public class DynamicAddConsumerIT extends BaseConf { @Test public void testAddTwoConsumer() { int msgSize = 100; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); asyncDefaultMQProducer.start(); TestUtils.waitForSeconds(waitTime); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); asyncDefaultMQProducer.waitSendAll(waitTime * 6); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java index 13237206..68d9198c 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT import org.apache.rocketmq.test.client.mq.MQAsyncProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.MQWait; import org.apache.rocketmq.test.util.TestUtils; import org.junit.After; @@ -46,15 +46,15 @@ public class DynamicCrashConsumerIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testAddOneConsumer() { int msgSize = 100; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); asyncDefaultMQProducer.start(); @@ -75,11 +75,11 @@ public class DynamicCrashConsumerIT extends BaseConf { @Test public void testAddTwoConsumer() { int msgSize = 100; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); asyncDefaultMQProducer.start(); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java index 15d91a14..79f15dcc 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer; import org.apache.rocketmq.test.factory.ConsumerFactory; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -49,7 +49,7 @@ public class SqlFilterIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -58,7 +58,7 @@ public class SqlFilterIT extends BaseConf { String group = initConsumerGroup(); MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))"); - RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListner(group + "_1")); + RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListener(group + "_1")); Thread.sleep(3000); producer.send("TagA", msgSize); producer.send("TagB", msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java index 37ccb4d2..564da5ce 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.factory.MQMessageFactory; import org.apache.rocketmq.test.factory.TagMessage; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -49,7 +49,7 @@ public class MulTagSubIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -58,7 +58,7 @@ public class MulTagSubIT extends BaseConf { String subExpress = String.format("%s||jueyin2", tag); int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(tag, msgSize); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -75,7 +75,7 @@ public class MulTagSubIT extends BaseConf { String subExpress = String.format("%s||noExistTag", tag2); int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(tag1, msgSize); Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); @@ -98,7 +98,7 @@ public class MulTagSubIT extends BaseConf { TagMessage tagMessage = new TagMessage(tags, topic, msgSize); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(tagMessage.getMixedTagMessages()); Assert.assertEquals("Not all sent succeeded", msgSize * tags.length, @@ -119,7 +119,7 @@ public class MulTagSubIT extends BaseConf { TagMessage tagMessage = new TagMessage(tags, topic, msgSize); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(tagMessage.getMixedTagMessages()); Assert.assertEquals("Not all sent succeeded", msgSize * tags.length, @@ -141,7 +141,7 @@ public class MulTagSubIT extends BaseConf { TagMessage tagMessage = new TagMessage(tags, topic, msgSize); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(tagMessage.getMixedTagMessages()); Assert.assertEquals("Not all sent succeeded", msgSize * tags.length, diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java index 1952f305..423baae8 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.factory.MQMessageFactory; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -47,14 +47,14 @@ public class TagMessageWith1ConsumerIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testTagSmoke() { String tag = "jueyin"; int msgSize = 10; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListener()); producer.send(tag, msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -68,7 +68,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf { String subExprress = "*"; int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExprress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -84,7 +84,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf { String subExpress = "*"; int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(tag, msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -100,7 +100,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf { String subExpress = "*"; int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(tag, msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -116,7 +116,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf { String subExpress = null; int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(tag, msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -133,7 +133,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf { String subExpress = "*"; int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); List tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize); List tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize); @@ -156,7 +156,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf { String subExpress = null; int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); List tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize); List tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize); @@ -178,7 +178,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf { String subExpress = tag2; int msgSize = 10; RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, - new RMQNormalListner()); + new RMQNormalListener()); List tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize); List tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java index 8cc97f2f..31321975 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.factory.MQMessageFactory; import org.apache.rocketmq.test.factory.TagMessage; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -49,7 +49,7 @@ public class TagMessageWithMulConsumerIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -58,9 +58,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf { String tag2 = "jueyin2"; int msgSize = 10; RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tag1, - new RMQNormalListner()); + new RMQNormalListener()); RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tag2, - new RMQNormalListner()); + new RMQNormalListener()); List tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize); producer.send(tag1Msgs); @@ -89,9 +89,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf { TagMessage tagMessage = new TagMessage(tags, topic, msgSize); RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tags[0], - new RMQNormalListner()); + new RMQNormalListener()); RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tags[1], - new RMQNormalListner()); + new RMQNormalListener()); List tagMsgs = tagMessage.getMixedTagMessages(); producer.send(tagMsgs); @@ -120,9 +120,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf { TagMessage tagMessage = new TagMessage(tags, topic, msgSize); RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, sub1, - new RMQNormalListner()); + new RMQNormalListener()); RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, sub2, - new RMQNormalListner()); + new RMQNormalListener()); List tagMsgs = tagMessage.getMixedTagMessages(); producer.send(tagMsgs); @@ -152,13 +152,13 @@ public class TagMessageWithMulConsumerIT extends BaseConf { int msgSize = 10; RMQNormalConsumer consumerSubTwoMatchAll = getConsumer(nsAddr, topic, sub1, - new RMQNormalListner()); + new RMQNormalListener()); RMQNormalConsumer consumerSubTwoMachieOne = getConsumer(nsAddr, topic, sub2, - new RMQNormalListner()); + new RMQNormalListener()); RMQNormalConsumer consumerSubTag1 = getConsumer(nsAddr, topic, sub3, - new RMQNormalListner()); + new RMQNormalListener()); RMQNormalConsumer consumerSubAll = getConsumer(nsAddr, topic, sub4, - new RMQNormalListner()); + new RMQNormalListener()); producer.send(msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java index df8945e3..486f2902 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.RandomUtils; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; @@ -47,7 +47,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -56,9 +56,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { String originMsgDCName = RandomUtils.getStringByUUID(); String msgBodyDCName = RandomUtils.getStringByUUID(); RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, - new RMQNormalListner(originMsgDCName, msgBodyDCName)); + new RMQNormalListener(originMsgDCName, msgBodyDCName)); getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, - new RMQNormalListner(originMsgDCName, msgBodyDCName)); + new RMQNormalListener(originMsgDCName, msgBodyDCName)); producer.send(tag, msgSize); Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -75,11 +75,11 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { String msgBodyDCName = RandomUtils.getStringByUUID(); RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, - new RMQNormalListner(originMsgDCName, msgBodyDCName)); + new RMQNormalListener(originMsgDCName, msgBodyDCName)); producer.send(tag, msgSize, 100); TestUtils.waitForMoment(5); getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, - new RMQNormalListner(originMsgDCName, msgBodyDCName)); + new RMQNormalListener(originMsgDCName, msgBodyDCName)); TestUtils.waitForMoment(5); consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); @@ -95,9 +95,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { String msgBodyDCName = RandomUtils.getStringByUUID(); RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, - new RMQNormalListner(originMsgDCName, msgBodyDCName)); + new RMQNormalListener(originMsgDCName, msgBodyDCName)); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, - new RMQNormalListner(originMsgDCName, msgBodyDCName)); + new RMQNormalListener(originMsgDCName, msgBodyDCName)); producer.send(tag, msgSize, 100); TestUtils.waitForMoment(5); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java index 56d49afd..d81c5ef8 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java @@ -21,7 +21,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.factory.MQMessageFactory; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.MQWait; import org.junit.After; import org.junit.Assert; @@ -40,7 +40,7 @@ public class MulConsumerMulTopicIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -48,10 +48,10 @@ public class MulConsumerMulTopicIT extends BaseConf { int msgSize = 10; String topic1 = initTopic(); String topic2 = initTopic(); - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener()); consumer1.subscribe(topic2, "*"); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic1, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); consumer2.subscribe(topic2, "*"); producer.send(MQMessageFactory.getMsg(topic1, msgSize)); @@ -69,10 +69,10 @@ public class MulConsumerMulTopicIT extends BaseConf { String topic1 = initTopic(); String topic2 = initTopic(); String tag = "jueyin_tag"; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener()); consumer1.subscribe(topic2, tag); RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic1, - "*", new RMQNormalListner()); + "*", new RMQNormalListener()); consumer2.subscribe(topic2, tag); producer.send(MQMessageFactory.getMsg(topic1, msgSize)); @@ -91,9 +91,9 @@ public class MulConsumerMulTopicIT extends BaseConf { String topic2 = initTopic(); String tag1 = "jueyin_tag_1"; String tag2 = "jueyin_tag_2"; - RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener()); consumer1.subscribe(topic2, tag1); - RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener()); consumer2.subscribe(topic2, tag1); producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2)); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java index 8c1a2841..f448b849 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java @@ -21,7 +21,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.factory.MQMessageFactory; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -40,7 +40,7 @@ public class OneConsumerMulTopicIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test @@ -48,7 +48,7 @@ public class OneConsumerMulTopicIT extends BaseConf { int msgSize = 10; String topic1 = initTopic(); String topic2 = initTopic(); - RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener()); consumer.subscribe(topic2, "*"); producer.send(MQMessageFactory.getMsg(topic1, msgSize)); @@ -67,7 +67,7 @@ public class OneConsumerMulTopicIT extends BaseConf { String topic1 = initTopic(); String topic2 = initTopic(); String tag = "jueyin_tag"; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener()); consumer.subscribe(topic2, tag); producer.send(MQMessageFactory.getMsg(topic1, msgSize)); @@ -87,7 +87,7 @@ public class OneConsumerMulTopicIT extends BaseConf { String topic2 = initTopic(); String tag1 = "jueyin_tag_1"; String tag2 = "jueyin_tag_2"; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener()); consumer.subscribe(topic2, tag1); producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2)); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java index b3d258f6..0bad6ea2 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java @@ -50,7 +50,7 @@ public class AsyncSendExceptionIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java index 637774e2..3efc5317 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Before; @@ -45,14 +45,14 @@ public class AsyncSendWithMessageQueueIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testAsyncSendWithMQ() { int msgSize = 20; int queueId = 0; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); producer.asyncSend(msgSize, mq); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java index 12df9678..fc42e28a 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Before; @@ -48,14 +48,14 @@ public class AsyncSendWithMessageQueueSelectorIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testSendWithSelector() { int msgSize = 20; final int queueId = 0; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); producer.asyncSend(msgSize, new MessageQueueSelector() { @Override diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java index 7b0d9fdb..52ac2957 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java @@ -22,7 +22,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Before; @@ -44,13 +44,13 @@ public class AsyncSendWithOnlySendCallBackIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testSendWithOnlyCallBack() { int msgSize = 20; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); producer.asyncSend(msgSize); producer.waitForResponse(10 * 1000); assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java index 6fb34af6..d100fb0e 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java @@ -47,7 +47,7 @@ public class BatchSendIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java index b264ef73..b5536e39 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.factory.MessageFactory; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,7 +45,7 @@ public class MessageUserPropIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } /** @@ -58,7 +58,7 @@ public class MessageUserPropIT extends BaseConf { String msgValue = "jueyinValue"; msg.putUserProperty(msgKey, msgValue); - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); producer.send(msg, null); assertThat(producer.getAllMsgBody().size()).isEqualTo(1); @@ -80,7 +80,7 @@ public class MessageUserPropIT extends BaseConf { String msgValue = "jueyinzhi"; msg.putUserProperty(msgKey, msgValue); - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); producer.send(msg, null); assertThat(producer.getAllMsgBody().size()).isEqualTo(1); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java index fbaa3c29..b5e49cdb 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java @@ -39,7 +39,7 @@ public class ProducerGroupAndInstanceNameValidityIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } /** diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java index 19d8ca58..1113689b 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java @@ -44,7 +44,7 @@ public class OneWaySendExceptionIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test(expected = java.lang.NullPointerException.class) diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java index 17572dd9..efd582d4 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java @@ -22,7 +22,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Before; @@ -44,13 +44,13 @@ public class OneWaySendIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testOneWaySendWithOnlyMsgAsParam() { int msgSize = 20; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); producer.sendOneWay(msgSize); producer.waitForResponse(5 * 1000); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java index bfb45a80..ebab5161 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Before; @@ -46,14 +46,14 @@ public class OneWaySendWithMQIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testAsyncSendWithMQ() { int msgSize = 20; int queueId = 0; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); producer.sendOneWay(msgSize, mq); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java index 5dd3e447..ed567a06 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Before; @@ -49,14 +49,14 @@ public class OneWaySendWithSelectorIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test public void testSendWithSelector() { int msgSize = 20; final int queueId = 0; - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); producer.sendOneWay(msgSize, new MessageQueueSelector() { @Override diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java index 8ae936bd..ba2b3372 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java @@ -48,7 +48,7 @@ public class OrderMsgDynamicRebalanceIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java index 4b54feda..fa3320c5 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java @@ -49,7 +49,7 @@ public class OrderMsgIT extends BaseConf { @After public void tearDown() { - shutDown(); + shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java index 68a207c2..bae53974 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java @@ -48,7 +48,7 @@ public class OrderMsgRebalanceIT extends BaseConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java index 0636d58a..5d05570f 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java @@ -47,7 +47,7 @@ public class OrderMsgWithTagIT extends BaseConf { @After public void tearDown() { - shutDown(); + shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java index a1520a00..e6511119 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java @@ -42,7 +42,7 @@ public class QueryMsgByIdExceptionIT extends BaseConf { @AfterClass public static void tearDown() { - shutDown(); + shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java index 2d9bac72..88e8b462 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; @@ -44,12 +44,12 @@ public class QueryMsgByIdIT extends BaseConf { topic = initTopic(); logger.info(String.format("use topic: %s;", topic)); producer = getProducer(nsAddr, topic); - consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); } @After public void tearDown() { - shutDown(); + shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java index 68dd8db2..827d4f99 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java @@ -45,7 +45,7 @@ public class QueryMsgByKeyIT extends BaseConf { @After public void tearDown() { - shutDown(); + shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java index 9b18d5cc..8cb0f41c 100644 --- a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java @@ -47,7 +47,7 @@ public class NormalMsgDelayIT extends DelayConf { @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java index 35d91717..c7886554 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java @@ -21,7 +21,7 @@ import org.apache.log4j.Logger; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Assert; @@ -41,12 +41,12 @@ public class NormalMessageSendAndRecvIT extends BaseConf { topic = initTopic(); logger.info(String.format("use topic: %s;", topic)); producer = getProducer(nsAddr, topic); - consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); } @After public void tearDown() { - super.shutDown(); + super.shutdown(); } @Test diff --git a/test/src/test/java/org/apache/rocketmq/test/tls/TLS_IT.java b/test/src/test/java/org/apache/rocketmq/test/tls/TLS_IT.java new file mode 100644 index 00000000..2ff2b209 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/tls/TLS_IT.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.tls; + +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; +import org.apache.rocketmq.test.util.MQWait; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TLS_IT extends BaseConf { + + private RMQNormalProducer producer; + private RMQNormalConsumer consumer; + + private String topic; + + @Before + public void setUp() { + topic = initTopic(); + // Send messages via TLS + producer = getProducer(nsAddr, topic, true); + // Receive messages via TLS + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), true); + } + + @After + public void tearDown() { + shutdown(); + } + + @Test + public void testSendAndReceiveMessageOverTLS() { + int numberOfMessagesToSend = 16; + producer.send(numberOfMessagesToSend); + + boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener()); + Assertions.assertThat(consumedAll).isEqualTo(true); + } + +} diff --git a/test/src/test/java/org/apache/rocketmq/test/tls/TLS_Mix2_IT.java b/test/src/test/java/org/apache/rocketmq/test/tls/TLS_Mix2_IT.java new file mode 100644 index 00000000..cd319e44 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/tls/TLS_Mix2_IT.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.tls; + +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; +import org.apache.rocketmq.test.util.MQWait; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TLS_Mix2_IT extends BaseConf { + + private RMQNormalProducer producer; + private RMQNormalConsumer consumer; + + private String topic; + + @Before + public void setUp() { + topic = initTopic(); + // send message via TLS + producer = getProducer(nsAddr, topic, true); + + // Receive message without TLS. + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), false); + } + + @After + public void tearDown() { + shutdown(); + } + + @Test + public void testSendAndReceiveMessageOverTLS() { + int numberOfMessagesToSend = 16; + producer.send(numberOfMessagesToSend); + + boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener()); + Assertions.assertThat(consumedAll).isEqualTo(true); + } + +} diff --git a/test/src/test/java/org/apache/rocketmq/test/tls/TLS_Mix_IT.java b/test/src/test/java/org/apache/rocketmq/test/tls/TLS_Mix_IT.java new file mode 100644 index 00000000..77a61ae8 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/tls/TLS_Mix_IT.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.tls; + +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; +import org.apache.rocketmq.test.util.MQWait; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TLS_Mix_IT extends BaseConf { + + private RMQNormalProducer producer; + private RMQNormalConsumer consumer; + + private String topic; + + @Before + public void setUp() { + topic = initTopic(); + + // send message without TLS + producer = getProducer(nsAddr, topic); + + // Receive message via TLS + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), true); + } + + @After + public void tearDown() { + shutdown(); + } + + @Test + public void testSendAndReceiveMessageOverTLS() { + int numberOfMessagesToSend = 16; + producer.send(numberOfMessagesToSend); + + boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener()); + Assertions.assertThat(consumedAll).isEqualTo(true); + } + +} -- GitLab