提交 a1764ab0 编写于 作者: L Li Zhanhui 提交者: dongeforever

[ROCKETMQ-28] Encrypt transmission layer closes apache/rocketmq#118

上级 1bbd4cd6
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.SslMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
......@@ -97,6 +98,7 @@ public class BrokerStartup {
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(NettySystemConfig.sslMode != SslMode.DISABLED);
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
......
......@@ -36,6 +36,12 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
......
......@@ -45,6 +45,8 @@ public class ClientConfig {
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean useTLS;
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
......@@ -92,6 +94,7 @@ public class ClientConfig {
this.unitMode = cc.unitMode;
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
this.useTLS = cc.useTLS;
}
public ClientConfig cloneClientConfig() {
......@@ -106,6 +109,7 @@ public class ClientConfig {
cc.unitMode = unitMode;
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
cc.useTLS = useTLS;
return cc;
}
......@@ -173,12 +177,20 @@ public class ClientConfig {
this.vipChannelEnabled = vipChannelEnabled;
}
public boolean isUseTLS() {
return useTLS;
}
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + "]";
+ vipChannelEnabled + ", useTLS=" + useTLS + "]";
}
}
......@@ -127,6 +127,7 @@ public class MQClientInstance {
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
......
......@@ -44,7 +44,7 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
......
......@@ -41,7 +41,7 @@ JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
......
......@@ -550,7 +550,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
<version>4.0.42.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
......
......@@ -45,5 +45,22 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
<version>1.1.33.Fork22</version>
<classifier>${os.detected.classifier}</classifier>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.0.Final</version>
</extension>
</extensions>
</build>
</project>
......@@ -27,24 +27,24 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
public void updateNameServerAddressList(final List<String> addrs);
void updateNameServerAddressList(final List<String> addrs);
public List<String> getNameServerAddressList();
List<String> getNameServerAddressList();
public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
public boolean isChannelWriteable(final String addr);
boolean isChannelWritable(final String addr);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.common;
/**
* For server, three SSL modes are supported: disabled, permissive and enforcing.
* <ol>
* <li><strong>disable:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.</li>
* <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or without SSL;</li>
* <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
* </ol>
*/
public enum SslMode {
DISABLED("disabled"),
PERMISSIVE("permissive"),
ENFORCING("enforcing");
private String name;
SslMode(String name) {
this.name = name;
}
public static SslMode parse(String mode) {
for (SslMode sslMode: SslMode.values()) {
if (sslMode.name.equals(mode)) {
return sslMode;
}
}
return PERMISSIVE;
}
public String getName() {
return name;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.ssl.SslHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/**
* <p>
* By default, file region are directly transferred to socket channel which is known as zero copy. In case we need
* to encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this
* process.
* </p>
*/
public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> {
/**
* Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that
* can be handled by this encoder.
*
* @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link
* io.netty.handler.codec.MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception {
WritableByteChannel writableByteChannel = new WritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {
out.writeBytes(src);
return out.capacity();
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void close() throws IOException {
}
};
long toTransfer = msg.count();
while (true) {
long transferred = msg.transfered();
if (toTransfer - transferred <= 0) {
break;
}
msg.transferTo(writableByteChannel, transferred);
}
}
}
\ No newline at end of file
......@@ -38,6 +38,8 @@ public class NettyClientConfig {
private boolean clientPooledByteBufAllocatorEnable = false;
private boolean clientCloseSocketIfTimeout = false;
private boolean useTLS;
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
......@@ -125,4 +127,12 @@ public class NettyClientConfig {
public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
}
public boolean isUseTLS() {
return useTLS;
}
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
}
......@@ -20,6 +20,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
......@@ -88,6 +90,11 @@ public abstract class NettyRemotingAbstract {
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
* SSL context via which to create {@link SslHandler}.
*/
protected SslContext sslContext;
/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
*
......
......@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
......@@ -34,6 +35,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
......@@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -120,6 +123,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = SslHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (SSLException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
private static int initValueIndex() {
......@@ -151,7 +166,16 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
......@@ -421,17 +445,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
cw.getChannel().close();
channelTables.remove(addr);
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection = false;
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
return cw.getChannel();
cw.getChannel().close();
this.channelTables.remove(addr);
createNewConnection = true;
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
......@@ -530,10 +557,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
@Override
public boolean isChannelWriteable(String addr) {
public boolean isChannelWritable(String addr) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.isWriteable();
return cw.isWritable();
}
return true;
}
......@@ -569,7 +596,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}
public boolean isWriteable() {
public boolean isWritable() {
return this.channelFuture.channel().isWritable();
}
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
......@@ -37,12 +38,15 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -50,6 +54,7 @@ import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.SslMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
......@@ -74,6 +79,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private int port = 0;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
......@@ -129,6 +138,20 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
});
}
SslMode sslMode = NettySystemConfig.sslMode;
log.info("Server is running in TLS {} mode", sslMode.getName());
if (sslMode != SslMode.DISABLED) {
try {
sslContext = SslHelper.buildSslContext(false);
log.info("SSLContext created for server");
} catch (CertificateException e) {
log.error("Failed to create SSLContext for server", e);
} catch (SSLException e) {
log.error("Failed to create SSLContext for server", e);
}
}
}
private boolean useEpoll() {
......@@ -164,13 +187,16 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler());
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(NettySystemConfig.sslMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
......@@ -298,6 +324,68 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return this.publicExecutor;
}
class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final SslMode sslMode;
private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
HandshakeHandler(SslMode sslMode) {
this.sslMode = sslMode;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// mark the current position so that we can peek the first byte to determine if the content is starting with
// TLS handshake
msg.markReaderIndex();
byte b = msg.getByte(0);
if (b == HANDSHAKE_MAGIC_CODE) {
switch (sslMode) {
case DISABLED:
ctx.close();
log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
break;
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error("Trying to establish a SSL connection but sslContext is null");
}
break;
default:
log.warn("Unknown TLS mode");
break;
}
} else if (sslMode == SslMode.ENFORCING) {
ctx.close();
log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
}
// reset the reader index so that handshake negotiation may proceed as normal.
msg.resetReaderIndex();
try {
// Remove this handler
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
// Hand over this message to the next .
ctx.fireChannelRead(msg.retain());
}
}
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
......
......@@ -17,6 +17,8 @@
package org.apache.rocketmq.remoting.netty;
import org.apache.rocketmq.remoting.common.SslMode;
public class NettySystemConfig {
public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
"com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
......@@ -28,10 +30,16 @@ public class NettySystemConfig {
"com.rocketmq.remoting.clientAsyncSemaphoreValue";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE =
"com.rocketmq.remoting.clientOnewaySemaphoreValue";
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
Boolean
.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE =
public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE = //
"org.apache.rocketmq.remoting.ssl.mode";
public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE = //
"org.apache.rocketmq.remoting.ssl.config.file";
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
......@@ -39,4 +47,18 @@ public class NettySystemConfig {
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
public static int socketRcvbufSize =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
/**
* For server, three SSL modes are supported: disabled, permissive and enforcing.
* <ol>
* <li><strong>disable:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.</li>
* <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or without SSL;</li>
* <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
* </ol>
*/
public static SslMode sslMode = //
SslMode.parse(System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE, "permissive"));
public static String sslConfigFile = //
System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE, "/etc/rocketmq/ssl.properties");
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.cert.CertificateException;
import java.util.Properties;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SslHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
public static SslContext buildSslContext(boolean forClient) throws SSLException, CertificateException {
File configFile = new File(NettySystemConfig.sslConfigFile);
boolean testMode = !(configFile.exists() && configFile.isFile() && configFile.canRead());
Properties properties = null;
if (!testMode) {
properties = new Properties();
InputStream inputStream = null;
try {
inputStream = new FileInputStream(configFile);
properties.load(inputStream);
} catch (FileNotFoundException ignore) {
} catch (IOException ignore) {
} finally {
if (null != inputStream) {
try {
inputStream.close();
} catch (IOException ignore) {
}
}
}
}
SslProvider provider = null;
if (OpenSsl.isAvailable()) {
provider = SslProvider.OPENSSL;
LOGGER.info("Using OpenSSL provider");
} else {
provider = SslProvider.JDK;
LOGGER.info("Using JDK SSL provider");
}
if (forClient) {
if (testMode) {
return SslContextBuilder
.forClient()
.sslProvider(SslProvider.JDK)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
} else {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);
if ("false".equals(properties.getProperty("client.auth.server"))) {
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
} else {
if (properties.containsKey("client.trustManager")) {
sslContextBuilder.trustManager(new File(properties.getProperty("client.trustManager")));
}
}
return sslContextBuilder.keyManager(
properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null,
properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.keyFile")) : null,
properties.containsKey("client.password") ? properties.getProperty("client.password") : null)
.build();
}
} else {
if (testMode) {
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
return SslContextBuilder
.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
.sslProvider(SslProvider.JDK)
.clientAuth(ClientAuth.OPTIONAL)
.build();
} else {
return SslContextBuilder.forServer(
properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null,
properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.keyFile")) : null,
properties.containsKey("server.password") ? properties.getProperty("server.password") : null)
.sslProvider(provider)
.trustManager(new File(properties.getProperty("server.trustManager")))
.clientAuth(parseClientAuthMode(properties.getProperty("server.auth.client")))
.build();
}
}
}
private static ClientAuth parseClientAuthMode(String authMode) {
if (null == authMode || authMode.trim().isEmpty()) {
return ClientAuth.NONE;
}
if ("optional".equalsIgnoreCase(authMode)) {
return ClientAuth.OPTIONAL;
}
return ClientAuth.REQUIRE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.FileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;
public class FileRegionEncoderTest {
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
/**
* Write byte array to the specified file.
*
* @param file File to write to.
* @param data byte array to write.
* @throws IOException in case there is an exception.
*/
private static void write(File file, byte[] data) throws IOException {
BufferedOutputStream bufferedOutputStream = null;
try {
bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file, false));
bufferedOutputStream.write(data);
bufferedOutputStream.flush();
} finally {
if (null != bufferedOutputStream) {
bufferedOutputStream.close();
}
}
}
}
\ No newline at end of file
......@@ -29,8 +29,8 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
protected DefaultMQPushConsumer consumer = null;
public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
String consumerGroup, AbstractListener listner) {
super(nsAddr, topic, subExpression, consumerGroup, listner);
String consumerGroup, AbstractListener listener) {
super(nsAddr, topic, subExpression, consumerGroup, listener);
}
public AbstractListener getListener() {
......@@ -42,6 +42,10 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
}
public void create() {
create(false);
}
public void create(boolean useTLS) {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr);
......@@ -52,6 +56,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
e.printStackTrace();
}
consumer.setMessageListener(listener);
consumer.setUseTLS(useTLS);
}
public void start() {
......
......@@ -34,20 +34,29 @@ public class RMQNormalProducer extends AbstractMQProducer {
private String nsAddr = null;
public RMQNormalProducer(String nsAddr, String topic) {
this(nsAddr, topic, false);
}
public RMQNormalProducer(String nsAddr, String topic, boolean useTLS) {
super(topic);
this.nsAddr = nsAddr;
create();
create(useTLS);
start();
}
public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
String producerInstanceName) {
this(nsAddr, topic, producerGroupName, producerInstanceName, false);
}
public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
String producerInstanceName, boolean useTLS) {
super(topic);
this.producerGroupName = producerGroupName;
this.producerInstanceName = producerInstanceName;
this.nsAddr = nsAddr;
create();
create(useTLS);
start();
}
......@@ -59,17 +68,18 @@ public class RMQNormalProducer extends AbstractMQProducer {
this.producer = producer;
}
protected void create() {
protected void create(boolean useTLS) {
producer = new DefaultMQProducer();
producer.setProducerGroup(getProducerGroupName());
producer.setInstanceName(getProducerInstanceName());
producer.setUseTLS(useTLS);
if (nsAddr != null) {
producer.setNamesrvAddr(nsAddr);
}
}
public void start() {
try {
producer.start();
......@@ -83,10 +93,10 @@ public class RMQNormalProducer extends AbstractMQProducer {
public SendResult send(Object msg, Object orderKey) {
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
Message metaqMsg = (Message) msg;
Message message = (Message) msg;
try {
long start = System.currentTimeMillis();
metaqResult = producer.send(metaqMsg);
metaqResult = producer.send(message);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
......@@ -94,9 +104,9 @@ public class RMQNormalProducer extends AbstractMQProducer {
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
msgBodys.addData(new String(metaqMsg.getBody()));
msgBodys.addData(new String(message.getBody()));
originMsgs.addData(msg);
originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult);
originMsgIndex.put(new String(message.getBody()), metaqResult);
} catch (Exception e) {
if (isDebug) {
e.printStackTrace();
......
......@@ -69,8 +69,8 @@ public abstract class AbstractMQConsumer implements MQConsumer {
return listener;
}
public void setListener(AbstractListener listener) {
this.listener = listener;
public void setListener(AbstractListener listner) {
this.listener = listner;
}
public String getNsAddr() {
......
......@@ -20,6 +20,8 @@ package org.apache.rocketmq.test.clientinterface;
public interface MQConsumer {
void create();
void create(boolean useTLS);
void start();
void shutdown();
......
......@@ -27,10 +27,16 @@ public class ConsumerFactory {
public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
AbstractListener listner) {
AbstractListener listener) {
return getRMQNormalConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
AbstractListener listener, boolean useTLS) {
RMQNormalConsumer consumer = new RMQNormalConsumer(nsAddr, topic, subExpression,
consumerGroup, listner);
consumer.create();
consumerGroup, listener);
consumer.create(useTLS);
consumer.start();
return consumer;
}
......
......@@ -25,24 +25,24 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQNormalListner extends AbstractListener implements MessageListenerConcurrently {
public class RMQNormalListener extends AbstractListener implements MessageListenerConcurrently {
private ConsumeConcurrentlyStatus consumeStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
private AtomicInteger msgIndex = new AtomicInteger(0);
public RMQNormalListner() {
public RMQNormalListener() {
super();
}
public RMQNormalListner(String listnerName) {
super(listnerName);
public RMQNormalListener(String listenerName) {
super(listenerName);
}
public RMQNormalListner(ConsumeConcurrentlyStatus consumeStatus) {
public RMQNormalListener(ConsumeConcurrentlyStatus consumeStatus) {
super();
this.consumeStatus = consumeStatus;
}
public RMQNormalListner(String originMsgCollector, String msgBodyCollector) {
public RMQNormalListener(String originMsgCollector, String msgBodyCollector) {
super(originMsgCollector, msgBodyCollector);
}
......@@ -51,7 +51,7 @@ public class RMQNormalListner extends AbstractListener implements MessageListene
for (MessageExt msg : msgs) {
msgIndex.getAndIncrement();
if (isDebug) {
if (listenerName != null && listenerName != "") {
if (listenerName != null && !listenerName.isEmpty()) {
logger.info(listenerName + ":" + msgIndex.get() + ":"
+ String.format("msgid:%s broker:%s queueId:%s offset:%s",
msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(),
......
......@@ -31,8 +31,6 @@ import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.Assert;
public class BaseConf {
protected static String nsAddr;
......@@ -82,7 +80,11 @@ public class BaseConf {
}
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic);
return getProducer(nsAddr, topic, false);
}
public static RMQNormalProducer getProducer(String nsAddr, String topic, boolean useTLS) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, useTLS);
if (debug) {
producer.setDebug();
}
......@@ -111,15 +113,25 @@ public class BaseConf {
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
AbstractListener listner) {
AbstractListener listener) {
return getConsumer(nsAddr, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
AbstractListener listener, boolean useTLS) {
String consumerGroup = initConsumerGroup();
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listner);
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
String subExpression, AbstractListener listener) {
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
String subExpression, AbstractListener listner) {
String subExpression, AbstractListener listener, boolean useTLS) {
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
topic, subExpression, listner);
topic, subExpression, listener, useTLS);
if (debug) {
consumer.setDebug();
}
......@@ -129,7 +141,7 @@ public class BaseConf {
return consumer;
}
public static void shutDown() {
public static void shutdown() {
try {
for (Object mqClient : mqClients) {
if (mqClient instanceof AbstractMQProducer) {
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -45,15 +45,15 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testTwoConsumerAndCrashOne() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
producer.send(msgSize);
......@@ -79,11 +79,11 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
@Test
public void test3ConsumerAndCrashOne() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
producer.send(msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
......@@ -46,15 +46,15 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testTwoConsumersBalance() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......@@ -75,13 +75,13 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
@Test
public void testFourConsumersBalance() {
int msgSize = 600;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -46,7 +46,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -55,7 +55,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
new RMQNormalListener(group + "_1"));
Thread.sleep(3000);
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
......@@ -66,7 +66,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
.containsExactlyElementsIn(producer.getAllMsgBody());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), waitTime);
assertThat(consumer2.getListener().getAllMsgBody().size()).isEqualTo(0);
}
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -56,9 +56,9 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
new RMQNormalListener(group + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -22,7 +22,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -55,10 +55,10 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT {
int msgSize = 16;
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*",
new RMQNormalListner());
new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*",
new RMQNormalListner(ConsumeConcurrentlyStatus.RECONSUME_LATER));
new RMQNormalListener(ConsumeConcurrentlyStatus.RECONSUME_LATER));
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -56,7 +56,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
new RMQNormalListener(group + "_1"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......@@ -71,7 +71,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
consumer1.clearMsg();
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -47,7 +47,7 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -57,9 +57,9 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT {
String group1 = initConsumerGroup();
String group2 = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group1, topic, "*",
new RMQNormalListner(group1 + "_1"));
new RMQNormalListener(group1 + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, group2, topic, "*",
new RMQNormalListner(group2 + "_2"));
new RMQNormalListener(group2 + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -47,7 +47,7 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -56,9 +56,9 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
new RMQNormalListener(group + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -49,7 +49,7 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -55,9 +55,9 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT {
String tag2 = "jueyin_tag_2";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag1,
new RMQNormalListner());
new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListner());
consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag2, msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -54,9 +54,9 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT {
String tag = "jueyin_tag";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*",
new RMQNormalListner());
new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner());
consumer1.getConsumerGroup(), topic, tag, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag, msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -54,9 +54,9 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT {
String tag = "jueyin_tag";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag,
new RMQNormalListner());
new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner());
consumer1.getConsumerGroup(), topic, tag, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag, msgSize);
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
......@@ -46,20 +46,20 @@ public class DynamicAddAndCrashIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAddOneConsumerAndCrashAfterWhile() {
int msgSize = 150;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
consumer2.shutdown();
......@@ -76,16 +76,16 @@ public class DynamicAddAndCrashIT extends BaseConf {
@Test
public void testAddTwoConsumerAndCrashAfterWhile() {
int msgSize = 150;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
consumer2.shutdown();
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
......@@ -46,20 +46,20 @@ public class DynamicAddConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAddOneConsumer() {
int msgSize = 100;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
asyncDefaultMQProducer.waitSendAll(waitTime * 6);
......@@ -74,16 +74,16 @@ public class DynamicAddConsumerIT extends BaseConf {
@Test
public void testAddTwoConsumer() {
int msgSize = 100;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
asyncDefaultMQProducer.waitSendAll(waitTime * 6);
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
......@@ -46,15 +46,15 @@ public class DynamicCrashConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAddOneConsumer() {
int msgSize = 100;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
......@@ -75,11 +75,11 @@ public class DynamicCrashConsumerIT extends BaseConf {
@Test
public void testAddTwoConsumer() {
int msgSize = 100;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
......
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -49,7 +49,7 @@ public class SqlFilterIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -58,7 +58,7 @@ public class SqlFilterIT extends BaseConf {
String group = initConsumerGroup();
MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListner(group + "_1"));
RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListener(group + "_1"));
Thread.sleep(3000);
producer.send("TagA", msgSize);
producer.send("TagB", msgSize);
......
......@@ -24,7 +24,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.factory.TagMessage;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -49,7 +49,7 @@ public class MulTagSubIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -58,7 +58,7 @@ public class MulTagSubIT extends BaseConf {
String subExpress = String.format("%s||jueyin2", tag);
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -75,7 +75,7 @@ public class MulTagSubIT extends BaseConf {
String subExpress = String.format("%s||noExistTag", tag2);
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag1, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
......@@ -98,7 +98,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
......@@ -119,7 +119,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
......@@ -141,7 +141,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -47,14 +47,14 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testTagSmoke() {
String tag = "jueyin";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -68,7 +68,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExprress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExprress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -84,7 +84,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -100,7 +100,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -116,7 +116,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = null;
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -133,7 +133,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize);
List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize);
......@@ -156,7 +156,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = null;
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize);
List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize);
......@@ -178,7 +178,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = tag2;
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize);
List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize);
......
......@@ -25,7 +25,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.factory.TagMessage;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -49,7 +49,7 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -58,9 +58,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
String tag2 = "jueyin2";
int msgSize = 10;
RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tag1,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tag2,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize);
producer.send(tag1Msgs);
......@@ -89,9 +89,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tags[0],
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tags[1],
new RMQNormalListner());
new RMQNormalListener());
List<Object> tagMsgs = tagMessage.getMixedTagMessages();
producer.send(tagMsgs);
......@@ -120,9 +120,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, sub1,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, sub2,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tagMsgs = tagMessage.getMixedTagMessages();
producer.send(tagMsgs);
......@@ -152,13 +152,13 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
int msgSize = 10;
RMQNormalConsumer consumerSubTwoMatchAll = getConsumer(nsAddr, topic, sub1,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerSubTwoMachieOne = getConsumer(nsAddr, topic, sub2,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerSubTag1 = getConsumer(nsAddr, topic, sub3,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerSubAll = getConsumer(nsAddr, topic, sub4,
new RMQNormalListner());
new RMQNormalListener());
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.RandomUtils;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
......@@ -47,7 +47,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -56,9 +56,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
String originMsgDCName = RandomUtils.getStringByUUID();
String msgBodyDCName = RandomUtils.getStringByUUID();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -75,11 +75,11 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
String msgBodyDCName = RandomUtils.getStringByUUID();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100);
TestUtils.waitForMoment(5);
getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
TestUtils.waitForMoment(5);
consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -95,9 +95,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
String msgBodyDCName = RandomUtils.getStringByUUID();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100);
TestUtils.waitForMoment(5);
......
......@@ -21,7 +21,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.junit.After;
import org.junit.Assert;
......@@ -40,7 +40,7 @@ public class MulConsumerMulTopicIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -48,10 +48,10 @@ public class MulConsumerMulTopicIT extends BaseConf {
int msgSize = 10;
String topic1 = initTopic();
String topic2 = initTopic();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer1.subscribe(topic2, "*");
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic1,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
consumer2.subscribe(topic2, "*");
producer.send(MQMessageFactory.getMsg(topic1, msgSize));
......@@ -69,10 +69,10 @@ public class MulConsumerMulTopicIT extends BaseConf {
String topic1 = initTopic();
String topic2 = initTopic();
String tag = "jueyin_tag";
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer1.subscribe(topic2, tag);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic1,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
consumer2.subscribe(topic2, tag);
producer.send(MQMessageFactory.getMsg(topic1, msgSize));
......@@ -91,9 +91,9 @@ public class MulConsumerMulTopicIT extends BaseConf {
String topic2 = initTopic();
String tag1 = "jueyin_tag_1";
String tag2 = "jueyin_tag_2";
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer1.subscribe(topic2, tag1);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer2.subscribe(topic2, tag1);
producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2));
......
......@@ -21,7 +21,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -40,7 +40,7 @@ public class OneConsumerMulTopicIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -48,7 +48,7 @@ public class OneConsumerMulTopicIT extends BaseConf {
int msgSize = 10;
String topic1 = initTopic();
String topic2 = initTopic();
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer.subscribe(topic2, "*");
producer.send(MQMessageFactory.getMsg(topic1, msgSize));
......@@ -67,7 +67,7 @@ public class OneConsumerMulTopicIT extends BaseConf {
String topic1 = initTopic();
String topic2 = initTopic();
String tag = "jueyin_tag";
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer.subscribe(topic2, tag);
producer.send(MQMessageFactory.getMsg(topic1, msgSize));
......@@ -87,7 +87,7 @@ public class OneConsumerMulTopicIT extends BaseConf {
String topic2 = initTopic();
String tag1 = "jueyin_tag_1";
String tag2 = "jueyin_tag_2";
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer.subscribe(topic2, tag1);
producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2));
......
......@@ -50,7 +50,7 @@ public class AsyncSendExceptionIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -45,14 +45,14 @@ public class AsyncSendWithMessageQueueIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAsyncSendWithMQ() {
int msgSize = 20;
int queueId = 0;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
producer.asyncSend(msgSize, mq);
......
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -48,14 +48,14 @@ public class AsyncSendWithMessageQueueSelectorIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testSendWithSelector() {
int msgSize = 20;
final int queueId = 0;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.asyncSend(msgSize, new MessageQueueSelector() {
@Override
......
......@@ -22,7 +22,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -44,13 +44,13 @@ public class AsyncSendWithOnlySendCallBackIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testSendWithOnlyCallBack() {
int msgSize = 20;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.asyncSend(msgSize);
producer.waitForResponse(10 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
......
......@@ -47,7 +47,7 @@ public class BatchSendIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -24,7 +24,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -45,7 +45,7 @@ public class MessageUserPropIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
/**
......@@ -58,7 +58,7 @@ public class MessageUserPropIT extends BaseConf {
String msgValue = "jueyinValue";
msg.putUserProperty(msgKey, msgValue);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.send(msg, null);
assertThat(producer.getAllMsgBody().size()).isEqualTo(1);
......@@ -80,7 +80,7 @@ public class MessageUserPropIT extends BaseConf {
String msgValue = "jueyinzhi";
msg.putUserProperty(msgKey, msgValue);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.send(msg, null);
assertThat(producer.getAllMsgBody().size()).isEqualTo(1);
......
......@@ -39,7 +39,7 @@ public class ProducerGroupAndInstanceNameValidityIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
/**
......
......@@ -44,7 +44,7 @@ public class OneWaySendExceptionIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test(expected = java.lang.NullPointerException.class)
......
......@@ -22,7 +22,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -44,13 +44,13 @@ public class OneWaySendIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testOneWaySendWithOnlyMsgAsParam() {
int msgSize = 20;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.sendOneWay(msgSize);
producer.waitForResponse(5 * 1000);
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -46,14 +46,14 @@ public class OneWaySendWithMQIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAsyncSendWithMQ() {
int msgSize = 20;
int queueId = 0;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
producer.sendOneWay(msgSize, mq);
......
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -49,14 +49,14 @@ public class OneWaySendWithSelectorIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testSendWithSelector() {
int msgSize = 20;
final int queueId = 0;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.sendOneWay(msgSize, new MessageQueueSelector() {
@Override
......
......@@ -48,7 +48,7 @@ public class OrderMsgDynamicRebalanceIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -49,7 +49,7 @@ public class OrderMsgIT extends BaseConf {
@After
public void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -48,7 +48,7 @@ public class OrderMsgRebalanceIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -47,7 +47,7 @@ public class OrderMsgWithTagIT extends BaseConf {
@After
public void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -42,7 +42,7 @@ public class QueryMsgByIdExceptionIT extends BaseConf {
@AfterClass
public static void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -44,12 +44,12 @@ public class QueryMsgByIdIT extends BaseConf {
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic);
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
}
@After
public void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -45,7 +45,7 @@ public class QueryMsgByKeyIT extends BaseConf {
@After
public void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -47,7 +47,7 @@ public class NormalMsgDelayIT extends DelayConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -41,12 +41,12 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic);
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
}
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.tls;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TLS_IT extends BaseConf {
private RMQNormalProducer producer;
private RMQNormalConsumer consumer;
private String topic;
@Before
public void setUp() {
topic = initTopic();
// Send messages via TLS
producer = getProducer(nsAddr, topic, true);
// Receive messages via TLS
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), true);
}
@After
public void tearDown() {
shutdown();
}
@Test
public void testSendAndReceiveMessageOverTLS() {
int numberOfMessagesToSend = 16;
producer.send(numberOfMessagesToSend);
boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener());
Assertions.assertThat(consumedAll).isEqualTo(true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.tls;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TLS_Mix2_IT extends BaseConf {
private RMQNormalProducer producer;
private RMQNormalConsumer consumer;
private String topic;
@Before
public void setUp() {
topic = initTopic();
// send message via TLS
producer = getProducer(nsAddr, topic, true);
// Receive message without TLS.
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), false);
}
@After
public void tearDown() {
shutdown();
}
@Test
public void testSendAndReceiveMessageOverTLS() {
int numberOfMessagesToSend = 16;
producer.send(numberOfMessagesToSend);
boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener());
Assertions.assertThat(consumedAll).isEqualTo(true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.tls;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TLS_Mix_IT extends BaseConf {
private RMQNormalProducer producer;
private RMQNormalConsumer consumer;
private String topic;
@Before
public void setUp() {
topic = initTopic();
// send message without TLS
producer = getProducer(nsAddr, topic);
// Receive message via TLS
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), true);
}
@After
public void tearDown() {
shutdown();
}
@Test
public void testSendAndReceiveMessageOverTLS() {
int numberOfMessagesToSend = 16;
producer.send(numberOfMessagesToSend);
boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener());
Assertions.assertThat(consumedAll).isEqualTo(true);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册