提交 6f0c9966 编写于 作者: 傅冲

Merge remote-tracking branch 'apache/develop'

......@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.SslMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
......@@ -97,6 +98,7 @@ public class BrokerStartup {
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(NettySystemConfig.sslMode != SslMode.DISABLED);
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
......
......@@ -36,6 +36,12 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
......
......@@ -45,6 +45,8 @@ public class ClientConfig {
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean useTLS;
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
......@@ -92,6 +94,7 @@ public class ClientConfig {
this.unitMode = cc.unitMode;
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
this.useTLS = cc.useTLS;
}
public ClientConfig cloneClientConfig() {
......@@ -106,6 +109,7 @@ public class ClientConfig {
cc.unitMode = unitMode;
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
cc.useTLS = useTLS;
return cc;
}
......@@ -173,12 +177,20 @@ public class ClientConfig {
this.vipChannelEnabled = vipChannelEnabled;
}
public boolean isUseTLS() {
return useTLS;
}
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + "]";
+ vipChannelEnabled + ", useTLS=" + useTLS + "]";
}
}
......@@ -166,10 +166,42 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000;
/**
* Flow control threshold
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
/**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*
* <p>
* The size of a message only measured by message body, so it's not accurate
*/
private int pullThresholdSizeForQueue = 100;
/**
* Flow control threshold on topic level, default value is -1(Unlimited)
* <p>
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
* {@code pullThresholdForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
* then pullThresholdForQueue will be set to 100
*/
private int pullThresholdForTopic = -1;
/**
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
* <p>
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
* {@code pullThresholdSizeForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
* assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
/**
* Message pull Interval
*/
......@@ -407,6 +439,30 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.pullThresholdForQueue = pullThresholdForQueue;
}
public int getPullThresholdForTopic() {
return pullThresholdForTopic;
}
public void setPullThresholdForTopic(final int pullThresholdForTopic) {
this.pullThresholdForTopic = pullThresholdForTopic;
}
public int getPullThresholdSizeForQueue() {
return pullThresholdSizeForQueue;
}
public void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue) {
this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
}
public int getPullThresholdSizeForTopic() {
return pullThresholdSizeForTopic;
}
public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) {
this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
}
public Map<String, String> getSubscription() {
return subscription;
}
......
......@@ -161,7 +161,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
final long beginTime = System.currentTimeMillis();
log.info("consumeMessageDirectly receive new messge: {}", msg);
log.info("consumeMessageDirectly receive new message: {}", msg);
try {
ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);
......
......@@ -144,7 +144,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
final long beginTime = System.currentTimeMillis();
log.info("consumeMessageDirectly receive new messge: {}", msg);
log.info("consumeMessageDirectly receive new message: {}", msg);
try {
ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
......
......@@ -569,6 +569,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
default:
break;
}
this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
......
......@@ -106,8 +106,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private MessageListener messageListenerInner;
private OffsetStore offsetStore;
private ConsumeMessageService consumeMessageService;
private long flowControlTimes1 = 0;
private long flowControlTimes2 = 0;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
......@@ -219,13 +219,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return;
}
long size = processQueue.getMsgCount().get();
if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((flowControlTimes1++ % 1000) == 0) {
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
......@@ -233,11 +245,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((flowControlTimes2++ % 1000) == 0) {
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, flowControlTimes2);
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
......@@ -577,6 +589,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
......@@ -731,6 +744,34 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
null);
}
// pullThresholdForTopic
if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
throw new MQClientException(
"pullThresholdForTopic Out of range [1, 6553500]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
// pullThresholdSizeForQueue
if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
throw new MQClientException(
"pullThresholdSizeForQueue Out of range [1, 1024]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
// pullThresholdSizeForTopic
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
throw new MQClientException(
"pullThresholdSizeForTopic Out of range [1, 102400]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
// pullInterval
if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
throw new MQClientException(
......
......@@ -45,6 +45,7 @@ public class ProcessQueue {
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
......@@ -129,6 +130,7 @@ public class ProcessQueue {
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
......@@ -189,6 +191,7 @@ public class ProcessQueue {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
......@@ -215,6 +218,10 @@ public class ProcessQueue {
return msgCount;
}
public AtomicLong getMsgSize() {
return msgSize;
}
public boolean isDropped() {
return dropped;
}
......@@ -250,7 +257,10 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.msgTreeMapTemp.lastKey();
msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
for (MessageExt msg : this.msgTreeMapTemp.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
this.msgTreeMapTemp.clear();
if (offset != null) {
return offset + 1;
......@@ -334,6 +344,7 @@ public class ProcessQueue {
this.msgTreeMap.clear();
this.msgTreeMapTemp.clear();
this.msgCount.set(0);
this.msgSize.set(0);
this.queueOffsetMax = 0L;
} finally {
this.lockTreeMap.writeLock().unlock();
......@@ -387,6 +398,7 @@ public class ProcessQueue {
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
info.setCachedMsgCount(this.msgTreeMap.size());
info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024)));
}
if (!this.msgTreeMapTemp.isEmpty()) {
......
......@@ -57,6 +57,26 @@ public class RebalancePushImpl extends RebalanceImpl {
long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
if (pullThresholdForTopic != -1) {
int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
log.info("The pullThresholdForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
}
int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
if (pullThresholdSizeForTopic != -1) {
int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
log.info("The pullThresholdSizeForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
}
}
// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}
......
......@@ -127,6 +127,7 @@ public class MQClientInstance {
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
......
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -83,6 +84,11 @@ public class DefaultMQPullConsumerTest {
pullConsumer.shutdown();
}
@Test
public void testStart_OffsetShouldNotNUllAfterStart() {
Assert.assertNotNull(pullConsumer.getOffsetStore());
}
@Test
public void testPullMessage_Success() throws Exception {
doAnswer(new Answer() {
......
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
......@@ -52,6 +53,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -61,6 +63,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -155,6 +158,11 @@ public class DefaultMQPushConsumerTest {
pushConsumer.shutdown();
}
@Test
public void testStart_OffsetShouldNotNUllAfterStart() {
Assert.assertNotNull(pushConsumer.getOffsetStore());
}
@Test
public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
......@@ -201,6 +209,59 @@ public class DefaultMQPushConsumerTest {
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
@Test
public void testCheckConfig() {
DefaultMQPushConsumer pushConsumer = createPushConsumer();
pushConsumer.setPullThresholdForQueue(65535 + 1);
try {
pushConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("pullThresholdForQueue Out of range [1, 65535]");
}
pushConsumer = createPushConsumer();
pushConsumer.setPullThresholdForTopic(65535 * 100 + 1);
try {
pushConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("pullThresholdForTopic Out of range [1, 6553500]");
}
pushConsumer = createPushConsumer();
pushConsumer.setPullThresholdSizeForQueue(1024 + 1);
try {
pushConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("pullThresholdSizeForQueue Out of range [1, 1024]");
}
pushConsumer = createPushConsumer();
pushConsumer.setPullThresholdSizeForTopic(1024 * 100 + 1);
try {
pushConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("pullThresholdSizeForTopic Out of range [1, 102400]");
}
}
private DefaultMQPushConsumer createPushConsumer() {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
});
return pushConsumer;
}
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class ProcessQueueTest {
@Test
public void testCachedMessageCount() {
ProcessQueue pq = new ProcessQueue();
pq.putMessage(createMessageList());
assertThat(pq.getMsgCount().get()).isEqualTo(100);
pq.takeMessags(10);
pq.commit();
assertThat(pq.getMsgCount().get()).isEqualTo(90);
pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
assertThat(pq.getMsgCount().get()).isEqualTo(89);
}
@Test
public void testCachedMessageSize() {
ProcessQueue pq = new ProcessQueue();
pq.putMessage(createMessageList());
assertThat(pq.getMsgSize().get()).isEqualTo(100 * 123);
pq.takeMessags(10);
pq.commit();
assertThat(pq.getMsgSize().get()).isEqualTo(90 * 123);
pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
}
@Test
public void testFillProcessQueueInfo() {
ProcessQueue pq = new ProcessQueue();
pq.putMessage(createMessageList(102400));
ProcessQueueInfo processQueueInfo = new ProcessQueueInfo();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(12);
pq.takeMessags(10000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(10);
pq.takeMessags(10000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(9);
pq.takeMessags(80000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0);
}
private List<MessageExt> createMessageList() {
return createMessageList(100);
}
private List<MessageExt> createMessageList(int count) {
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
for (int i = 0; i < count; i++) {
MessageExt messageExt = new MessageExt();
messageExt.setQueueOffset(i);
messageExt.setBody(new byte[123]);
messageExtList.add(messageExt);
}
return messageExtList;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class RebalancePushImplTest {
@Spy
private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"), null);
@Mock
private MQClientInstance mqClientInstance;
@Mock
private OffsetStore offsetStore;
private String consumerGroup = "CID_RebalancePushImplTest";
private String topic = "TopicA";
@Test
public void testMessageQueueChanged_CountThreshold() {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
// Just set pullThresholdForQueue
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(1024);
// Set pullThresholdForTopic
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(512);
// Change message queue allocate result
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(341);
}
private void doRebalanceForcibly(RebalancePushImpl rebalancePush, Set<MessageQueue> allocateResultSet) {
rebalancePush.topicSubscribeInfoTable.put(topic, allocateResultSet);
rebalancePush.doRebalance(false);
rebalancePush.messageQueueChanged(topic, allocateResultSet, allocateResultSet);
}
private void init(final RebalancePushImpl rebalancePush) {
rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData());
rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
when(mqClientInstance.findConsumerIdList(anyString(), anyString())).thenReturn(Collections.singletonList(consumerGroup));
when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
doAnswer(new Answer() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class));
}
@Test
public void testMessageQueueChanged_SizeThreshold() {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
// Just set pullThresholdSizeForQueue
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(1024);
// Set pullThresholdSizeForTopic
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(512);
// Change message queue allocate result
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(341);
}
@Test
public void testMessageQueueChanged_ConsumerRuntimeInfo() throws MQClientException {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
doRebalanceForcibly(rebalancePush, allocateResultSet);
defaultMQPushConsumer.setConsumeMessageService(new ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null));
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("-1");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("-1");
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("512");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("512");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
// Change message queue allocate result
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
doRebalanceForcibly(rebalancePush, allocateResultSet);
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("341");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("341");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
}
}
\ No newline at end of file
......@@ -25,6 +25,7 @@ public class ProcessQueueInfo {
private long cachedMsgMinOffset;
private long cachedMsgMaxOffset;
private int cachedMsgCount;
private int cachedMsgSizeInMiB;
private long transactionMsgMinOffset;
private long transactionMsgMaxOffset;
......@@ -142,16 +143,24 @@ public class ProcessQueueInfo {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
public int getCachedMsgSizeInMiB() {
return cachedMsgSizeInMiB;
}
public void setCachedMsgSizeInMiB(final int cachedMsgSizeInMiB) {
this.cachedMsgSizeInMiB = cachedMsgSizeInMiB;
}
@Override
public String toString() {
return "ProcessQueueInfo [commitOffset=" + commitOffset + ", cachedMsgMinOffset="
+ cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + ", cachedMsgCount="
+ cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset
+ cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset
+ ", cachedMsgCount=" + cachedMsgCount + ", cachedMsgSizeInMiB=" + cachedMsgSizeInMiB
+ ", transactionMsgMinOffset=" + transactionMsgMinOffset
+ ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount="
+ transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes
+ ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
+ droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp)
+ ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
}
}
......@@ -59,6 +59,6 @@ cat /sys/block/$DISK/queue/scheduler
if [ -d ${HOME}/tmpfs ] ; then
echo "tmpfs exist, do nothing."
else
ln -s /dev/shm tmpfs
ln -s /dev/shm ${HOME}/tmpfs
echo "create tmpfs ok"
fi
......@@ -29,7 +29,7 @@ rem ============================================================================
rem JVM Configuration
rem ===========================================================================================
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g"
set "JAVA_OPT=%JAVA_OPT% -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
set "JAVA_OPT=%JAVA_OPT% -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:%USERPROFILE%\mq_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
set "JAVA_OPT=%JAVA_OPT% -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
......
......@@ -37,14 +37,14 @@ export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
......
......@@ -26,8 +26,8 @@ for %%d in (%BASE_DIR%) do set BASE_DIR=%%~dpd
set CLASSPATH=.;%BASE_DIR%conf;%CLASSPATH%
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:PermSize=128m -XX:MaxPermSize=320m"
set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -XX:-UseParNewGC"
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:"%USERPROFILE%\rmq_srv_gc.log" -XX:+PrintGCDetails"
set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages"
......
......@@ -36,12 +36,12 @@ export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
......
......@@ -42,7 +42,7 @@ public class PushConsumer {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
......
......@@ -44,7 +44,7 @@ public class Consumer {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
......
......@@ -45,7 +45,7 @@ public class SqlConsumer {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
......
......@@ -41,7 +41,7 @@ public class Consumer {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
......
......@@ -67,7 +67,7 @@ public class Consumer {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
......
......@@ -34,7 +34,7 @@ public class PullConsumer {
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: " + mq + "%n");
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
......
......@@ -37,7 +37,7 @@ public class PushConsumer {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
......
......@@ -26,7 +26,7 @@ public class TransactionCheckListenerImpl implements TransactionCheckListener {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.printf("server checking TrMsg " + msg.toString() + "%n");
System.out.printf("server checking TrMsg %s%n", msg);
int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
......
......@@ -92,7 +92,7 @@ public class FiltersrvStartup {
Properties properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, filtersrvConfig);
System.out.printf("load config properties file OK, " + file + "%n");
System.out.printf("load config properties file OK, %s%n", file);
in.close();
String port = properties.getProperty("listenPort");
......@@ -116,8 +116,7 @@ public class FiltersrvStartup {
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig);
if (null == filtersrvConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ " variable in your environment to match the location of the RocketMQ installation%n");
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
......
......@@ -96,7 +96,7 @@ public class NamesrvStartup {
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation%n");
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
......
......@@ -152,7 +152,7 @@ public class RouteInfoManager {
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
......@@ -205,7 +205,7 @@ public class RouteInfoManager {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;
......
......@@ -550,7 +550,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
<version>4.0.42.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
......
......@@ -45,5 +45,22 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
<version>1.1.33.Fork22</version>
<classifier>${os.detected.classifier}</classifier>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.0.Final</version>
</extension>
</extensions>
</build>
</project>
......@@ -27,24 +27,24 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
public void updateNameServerAddressList(final List<String> addrs);
void updateNameServerAddressList(final List<String> addrs);
public List<String> getNameServerAddressList();
List<String> getNameServerAddressList();
public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
public boolean isChannelWriteable(final String addr);
boolean isChannelWritable(final String addr);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.common;
/**
* For server, three SSL modes are supported: disabled, permissive and enforcing.
* <ol>
* <li><strong>disabled:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.</li>
* <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or without SSL;</li>
* <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
* </ol>
*/
public enum SslMode {
DISABLED("disabled"),
PERMISSIVE("permissive"),
ENFORCING("enforcing");
private String name;
SslMode(String name) {
this.name = name;
}
public static SslMode parse(String mode) {
for (SslMode sslMode: SslMode.values()) {
if (sslMode.name.equals(mode)) {
return sslMode;
}
}
return PERMISSIVE;
}
public String getName() {
return name;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.ssl.SslHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/**
* <p>
* By default, file region are directly transferred to socket channel which is known as zero copy. In case we need
* to encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this
* process.
* </p>
*/
public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> {
/**
* Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that
* can be handled by this encoder.
*
* @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link
* io.netty.handler.codec.MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception {
WritableByteChannel writableByteChannel = new WritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {
out.writeBytes(src);
return out.capacity();
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void close() throws IOException {
}
};
long toTransfer = msg.count();
while (true) {
long transferred = msg.transfered();
if (toTransfer - transferred <= 0) {
break;
}
msg.transferTo(writableByteChannel, transferred);
}
}
}
\ No newline at end of file
......@@ -38,6 +38,8 @@ public class NettyClientConfig {
private boolean clientPooledByteBufAllocatorEnable = false;
private boolean clientCloseSocketIfTimeout = false;
private boolean useTLS;
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
......@@ -125,4 +127,12 @@ public class NettyClientConfig {
public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
}
public boolean isUseTLS() {
return useTLS;
}
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
}
......@@ -20,6 +20,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
......@@ -88,6 +90,11 @@ public abstract class NettyRemotingAbstract {
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
* SSL context via which to create {@link SslHandler}.
*/
protected SslContext sslContext;
/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
*
......
......@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
......@@ -34,6 +35,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
......@@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -120,6 +123,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = SslHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (SSLException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
private static int initValueIndex() {
......@@ -151,7 +166,16 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
......@@ -421,17 +445,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
cw.getChannel().close();
channelTables.remove(addr);
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection = false;
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
return cw.getChannel();
cw.getChannel().close();
this.channelTables.remove(addr);
createNewConnection = true;
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
......@@ -530,10 +557,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
@Override
public boolean isChannelWriteable(String addr) {
public boolean isChannelWritable(String addr) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.isWriteable();
return cw.isWritable();
}
return true;
}
......@@ -569,7 +596,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}
public boolean isWriteable() {
public boolean isWritable() {
return this.channelFuture.channel().isWritable();
}
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
......@@ -37,12 +38,15 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -50,6 +54,7 @@ import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.SslMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
......@@ -74,6 +79,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private int port = 0;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
......@@ -129,6 +138,20 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
});
}
SslMode sslMode = NettySystemConfig.sslMode;
log.info("Server is running in TLS {} mode", sslMode.getName());
if (sslMode != SslMode.DISABLED) {
try {
sslContext = SslHelper.buildSslContext(false);
log.info("SSLContext created for server");
} catch (CertificateException e) {
log.error("Failed to create SSLContext for server", e);
} catch (SSLException e) {
log.error("Failed to create SSLContext for server", e);
}
}
}
private boolean useEpoll() {
......@@ -164,13 +187,16 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(NettySystemConfig.sslMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler());
new NettyServerHandler()
);
}
});
......@@ -298,6 +324,68 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return this.publicExecutor;
}
class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final SslMode sslMode;
private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
HandshakeHandler(SslMode sslMode) {
this.sslMode = sslMode;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// mark the current position so that we can peek the first byte to determine if the content is starting with
// TLS handshake
msg.markReaderIndex();
byte b = msg.getByte(0);
if (b == HANDSHAKE_MAGIC_CODE) {
switch (sslMode) {
case DISABLED:
ctx.close();
log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
break;
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error("Trying to establish a SSL connection but sslContext is null");
}
break;
default:
log.warn("Unknown TLS mode");
break;
}
} else if (sslMode == SslMode.ENFORCING) {
ctx.close();
log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
}
// reset the reader index so that handshake negotiation may proceed as normal.
msg.resetReaderIndex();
try {
// Remove this handler
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
// Hand over this message to the next .
ctx.fireChannelRead(msg.retain());
}
}
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
......
......@@ -17,6 +17,8 @@
package org.apache.rocketmq.remoting.netty;
import org.apache.rocketmq.remoting.common.SslMode;
public class NettySystemConfig {
public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
"com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
......@@ -28,10 +30,16 @@ public class NettySystemConfig {
"com.rocketmq.remoting.clientAsyncSemaphoreValue";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE =
"com.rocketmq.remoting.clientOnewaySemaphoreValue";
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
Boolean
.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE =
public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE = //
"org.apache.rocketmq.remoting.ssl.mode";
public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE = //
"org.apache.rocketmq.remoting.ssl.config.file";
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
......@@ -39,4 +47,18 @@ public class NettySystemConfig {
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
public static int socketRcvbufSize =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
/**
* For server, three SSL modes are supported: disabled, permissive and enforcing.
* <ol>
* <li><strong>disabled:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.</li>
* <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or without SSL;</li>
* <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
* </ol>
*/
public static SslMode sslMode = //
SslMode.parse(System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE, "disabled"));
public static String sslConfigFile = //
System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE, "/etc/rocketmq/ssl.properties");
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.cert.CertificateException;
import java.util.Properties;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SslHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
public static SslContext buildSslContext(boolean forClient) throws SSLException, CertificateException {
File configFile = new File(NettySystemConfig.sslConfigFile);
boolean testMode = !(configFile.exists() && configFile.isFile() && configFile.canRead());
Properties properties = null;
if (!testMode) {
properties = new Properties();
InputStream inputStream = null;
try {
inputStream = new FileInputStream(configFile);
properties.load(inputStream);
} catch (FileNotFoundException ignore) {
} catch (IOException ignore) {
} finally {
if (null != inputStream) {
try {
inputStream.close();
} catch (IOException ignore) {
}
}
}
}
SslProvider provider = null;
if (OpenSsl.isAvailable()) {
provider = SslProvider.OPENSSL;
LOGGER.info("Using OpenSSL provider");
} else {
provider = SslProvider.JDK;
LOGGER.info("Using JDK SSL provider");
}
if (forClient) {
if (testMode) {
return SslContextBuilder
.forClient()
.sslProvider(SslProvider.JDK)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
} else {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);
if ("false".equals(properties.getProperty("client.auth.server"))) {
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
} else {
if (properties.containsKey("client.trustManager")) {
sslContextBuilder.trustManager(new File(properties.getProperty("client.trustManager")));
}
}
return sslContextBuilder.keyManager(
properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null,
properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.keyFile")) : null,
properties.containsKey("client.password") ? properties.getProperty("client.password") : null)
.build();
}
} else {
if (testMode) {
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
return SslContextBuilder
.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
.sslProvider(SslProvider.JDK)
.clientAuth(ClientAuth.OPTIONAL)
.build();
} else {
return SslContextBuilder.forServer(
properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null,
properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.keyFile")) : null,
properties.containsKey("server.password") ? properties.getProperty("server.password") : null)
.sslProvider(provider)
.trustManager(new File(properties.getProperty("server.trustManager")))
.clientAuth(parseClientAuthMode(properties.getProperty("server.auth.client")))
.build();
}
}
}
private static ClientAuth parseClientAuthMode(String authMode) {
if (null == authMode || authMode.trim().isEmpty()) {
return ClientAuth.NONE;
}
if ("optional".equalsIgnoreCase(authMode)) {
return ClientAuth.OPTIONAL;
}
return ClientAuth.REQUIRE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.FileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;
public class FileRegionEncoderTest {
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
/**
* Write byte array to the specified file.
*
* @param file File to write to.
* @param data byte array to write.
* @throws IOException in case there is an exception.
*/
private static void write(File file, byte[] data) throws IOException {
BufferedOutputStream bufferedOutputStream = null;
try {
bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file, false));
bufferedOutputStream.write(data);
bufferedOutputStream.flush();
} finally {
if (null != bufferedOutputStream) {
bufferedOutputStream.close();
}
}
}
}
\ No newline at end of file
......@@ -185,7 +185,7 @@ public class CommitLog {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName());
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
......@@ -403,7 +403,7 @@ public class CommitLog {
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this maped file " + mappedFile.getFileName());
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
......@@ -445,7 +445,7 @@ public class CommitLog {
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last maped file " + mappedFile.getFileName());
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
......@@ -571,7 +571,7 @@ public class CommitLog {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
......@@ -586,7 +586,7 @@ public class CommitLog {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
......@@ -721,7 +721,7 @@ public class CommitLog {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("Create maped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
......@@ -736,7 +736,7 @@ public class CommitLog {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("Create maped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
......
......@@ -121,7 +121,7 @@ public class ConsumeQueue {
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last maped file "
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
......
......@@ -416,12 +416,8 @@ public class DefaultMessageStore implements MessageStore {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
if (diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
return true;
}
return false;
return diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
@Override
......
......@@ -19,7 +19,7 @@ package org.apache.rocketmq.store;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Spin lock Implementation to put message, suggest using this witb low race conditions
* Spin lock Implementation to put message, suggest using this with low race conditions
*/
public class PutMessageSpinLock implements PutMessageLock {
//true: Can lock, false : in lock.
......
......@@ -29,8 +29,8 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
protected DefaultMQPushConsumer consumer = null;
public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
String consumerGroup, AbstractListener listner) {
super(nsAddr, topic, subExpression, consumerGroup, listner);
String consumerGroup, AbstractListener listener) {
super(nsAddr, topic, subExpression, consumerGroup, listener);
}
public AbstractListener getListener() {
......@@ -42,6 +42,10 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
}
public void create() {
create(false);
}
public void create(boolean useTLS) {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr);
......@@ -52,6 +56,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
e.printStackTrace();
}
consumer.setMessageListener(listener);
consumer.setUseTLS(useTLS);
}
public void start() {
......
......@@ -34,20 +34,29 @@ public class RMQNormalProducer extends AbstractMQProducer {
private String nsAddr = null;
public RMQNormalProducer(String nsAddr, String topic) {
this(nsAddr, topic, false);
}
public RMQNormalProducer(String nsAddr, String topic, boolean useTLS) {
super(topic);
this.nsAddr = nsAddr;
create();
create(useTLS);
start();
}
public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
String producerInstanceName) {
this(nsAddr, topic, producerGroupName, producerInstanceName, false);
}
public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
String producerInstanceName, boolean useTLS) {
super(topic);
this.producerGroupName = producerGroupName;
this.producerInstanceName = producerInstanceName;
this.nsAddr = nsAddr;
create();
create(useTLS);
start();
}
......@@ -59,17 +68,18 @@ public class RMQNormalProducer extends AbstractMQProducer {
this.producer = producer;
}
protected void create() {
protected void create(boolean useTLS) {
producer = new DefaultMQProducer();
producer.setProducerGroup(getProducerGroupName());
producer.setInstanceName(getProducerInstanceName());
producer.setUseTLS(useTLS);
if (nsAddr != null) {
producer.setNamesrvAddr(nsAddr);
}
}
public void start() {
try {
producer.start();
......@@ -83,10 +93,10 @@ public class RMQNormalProducer extends AbstractMQProducer {
public SendResult send(Object msg, Object orderKey) {
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
Message metaqMsg = (Message) msg;
Message message = (Message) msg;
try {
long start = System.currentTimeMillis();
metaqResult = producer.send(metaqMsg);
metaqResult = producer.send(message);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
......@@ -94,9 +104,9 @@ public class RMQNormalProducer extends AbstractMQProducer {
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
msgBodys.addData(new String(metaqMsg.getBody()));
msgBodys.addData(new String(message.getBody()));
originMsgs.addData(msg);
originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult);
originMsgIndex.put(new String(message.getBody()), metaqResult);
} catch (Exception e) {
if (isDebug) {
e.printStackTrace();
......
......@@ -69,8 +69,8 @@ public abstract class AbstractMQConsumer implements MQConsumer {
return listener;
}
public void setListener(AbstractListener listener) {
this.listener = listener;
public void setListener(AbstractListener listner) {
this.listener = listner;
}
public String getNsAddr() {
......
......@@ -20,6 +20,8 @@ package org.apache.rocketmq.test.clientinterface;
public interface MQConsumer {
void create();
void create(boolean useTLS);
void start();
void shutdown();
......
......@@ -27,10 +27,16 @@ public class ConsumerFactory {
public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
AbstractListener listner) {
AbstractListener listener) {
return getRMQNormalConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
AbstractListener listener, boolean useTLS) {
RMQNormalConsumer consumer = new RMQNormalConsumer(nsAddr, topic, subExpression,
consumerGroup, listner);
consumer.create();
consumerGroup, listener);
consumer.create(useTLS);
consumer.start();
return consumer;
}
......
......@@ -25,24 +25,24 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQNormalListner extends AbstractListener implements MessageListenerConcurrently {
public class RMQNormalListener extends AbstractListener implements MessageListenerConcurrently {
private ConsumeConcurrentlyStatus consumeStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
private AtomicInteger msgIndex = new AtomicInteger(0);
public RMQNormalListner() {
public RMQNormalListener() {
super();
}
public RMQNormalListner(String listnerName) {
super(listnerName);
public RMQNormalListener(String listenerName) {
super(listenerName);
}
public RMQNormalListner(ConsumeConcurrentlyStatus consumeStatus) {
public RMQNormalListener(ConsumeConcurrentlyStatus consumeStatus) {
super();
this.consumeStatus = consumeStatus;
}
public RMQNormalListner(String originMsgCollector, String msgBodyCollector) {
public RMQNormalListener(String originMsgCollector, String msgBodyCollector) {
super(originMsgCollector, msgBodyCollector);
}
......@@ -51,7 +51,7 @@ public class RMQNormalListner extends AbstractListener implements MessageListene
for (MessageExt msg : msgs) {
msgIndex.getAndIncrement();
if (isDebug) {
if (listenerName != null && listenerName != "") {
if (listenerName != null && !listenerName.isEmpty()) {
logger.info(listenerName + ":" + msgIndex.get() + ":"
+ String.format("msgid:%s broker:%s queueId:%s offset:%s",
msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(),
......
......@@ -31,8 +31,6 @@ import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.Assert;
public class BaseConf {
protected static String nsAddr;
......@@ -82,7 +80,11 @@ public class BaseConf {
}
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic);
return getProducer(nsAddr, topic, false);
}
public static RMQNormalProducer getProducer(String nsAddr, String topic, boolean useTLS) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, useTLS);
if (debug) {
producer.setDebug();
}
......@@ -111,15 +113,25 @@ public class BaseConf {
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
AbstractListener listner) {
AbstractListener listener) {
return getConsumer(nsAddr, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
AbstractListener listener, boolean useTLS) {
String consumerGroup = initConsumerGroup();
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listner);
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
String subExpression, AbstractListener listener) {
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
String subExpression, AbstractListener listner) {
String subExpression, AbstractListener listener, boolean useTLS) {
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
topic, subExpression, listner);
topic, subExpression, listener, useTLS);
if (debug) {
consumer.setDebug();
}
......@@ -129,7 +141,7 @@ public class BaseConf {
return consumer;
}
public static void shutDown() {
public static void shutdown() {
try {
for (Object mqClient : mqClients) {
if (mqClient instanceof AbstractMQProducer) {
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -45,15 +45,15 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testTwoConsumerAndCrashOne() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
producer.send(msgSize);
......@@ -79,11 +79,11 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
@Test
public void test3ConsumerAndCrashOne() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
producer.send(msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
......@@ -46,15 +46,15 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testTwoConsumersBalance() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......@@ -75,13 +75,13 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
@Test
public void testFourConsumersBalance() {
int msgSize = 600;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -46,7 +46,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -55,7 +55,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
new RMQNormalListener(group + "_1"));
Thread.sleep(3000);
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
......@@ -66,7 +66,7 @@ public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
.containsExactlyElementsIn(producer.getAllMsgBody());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), waitTime);
assertThat(consumer2.getListener().getAllMsgBody().size()).isEqualTo(0);
}
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -56,9 +56,9 @@ public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
new RMQNormalListener(group + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -22,7 +22,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -55,10 +55,10 @@ public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT {
int msgSize = 16;
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*",
new RMQNormalListner());
new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*",
new RMQNormalListner(ConsumeConcurrentlyStatus.RECONSUME_LATER));
new RMQNormalListener(ConsumeConcurrentlyStatus.RECONSUME_LATER));
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -47,7 +47,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -56,7 +56,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
new RMQNormalListener(group + "_1"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......@@ -71,7 +71,7 @@ public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
consumer1.clearMsg();
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -47,7 +47,7 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -57,9 +57,9 @@ public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT {
String group1 = initConsumerGroup();
String group2 = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group1, topic, "*",
new RMQNormalListner(group1 + "_1"));
new RMQNormalListener(group1 + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, group2, topic, "*",
new RMQNormalListner(group2 + "_2"));
new RMQNormalListener(group2 + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -47,7 +47,7 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -56,9 +56,9 @@ public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT {
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
new RMQNormalListener(group + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListener(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -49,7 +49,7 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -55,9 +55,9 @@ public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT {
String tag2 = "jueyin_tag_2";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag1,
new RMQNormalListner());
new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListner());
consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag2, msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -54,9 +54,9 @@ public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT {
String tag = "jueyin_tag";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*",
new RMQNormalListner());
new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner());
consumer1.getConsumerGroup(), topic, tag, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag, msgSize);
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -45,7 +45,7 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -54,9 +54,9 @@ public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT {
String tag = "jueyin_tag";
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag,
new RMQNormalListner());
new RMQNormalListener());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner());
consumer1.getConsumerGroup(), topic, tag, new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(tag, msgSize);
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
......@@ -46,20 +46,20 @@ public class DynamicAddAndCrashIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAddOneConsumerAndCrashAfterWhile() {
int msgSize = 150;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
consumer2.shutdown();
......@@ -76,16 +76,16 @@ public class DynamicAddAndCrashIT extends BaseConf {
@Test
public void testAddTwoConsumerAndCrashAfterWhile() {
int msgSize = 150;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
consumer2.shutdown();
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
......@@ -46,20 +46,20 @@ public class DynamicAddConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAddOneConsumer() {
int msgSize = 100;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
asyncDefaultMQProducer.waitSendAll(waitTime * 6);
......@@ -74,16 +74,16 @@ public class DynamicAddConsumerIT extends BaseConf {
@Test
public void testAddTwoConsumer() {
int msgSize = 100;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
TestUtils.waitForSeconds(waitTime);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
asyncDefaultMQProducer.waitSendAll(waitTime * 6);
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.After;
......@@ -46,15 +46,15 @@ public class DynamicCrashConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAddOneConsumer() {
int msgSize = 100;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
......@@ -75,11 +75,11 @@ public class DynamicCrashConsumerIT extends BaseConf {
@Test
public void testAddTwoConsumer() {
int msgSize = 100;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100);
asyncDefaultMQProducer.start();
......
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -49,7 +49,7 @@ public class SqlFilterIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -58,7 +58,7 @@ public class SqlFilterIT extends BaseConf {
String group = initConsumerGroup();
MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListner(group + "_1"));
RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListener(group + "_1"));
Thread.sleep(3000);
producer.send("TagA", msgSize);
producer.send("TagB", msgSize);
......
......@@ -24,7 +24,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.factory.TagMessage;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -49,7 +49,7 @@ public class MulTagSubIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -58,7 +58,7 @@ public class MulTagSubIT extends BaseConf {
String subExpress = String.format("%s||jueyin2", tag);
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -75,7 +75,7 @@ public class MulTagSubIT extends BaseConf {
String subExpress = String.format("%s||noExistTag", tag2);
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag1, msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
......@@ -98,7 +98,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
......@@ -119,7 +119,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
......@@ -141,7 +141,7 @@ public class MulTagSubIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tagMessage.getMixedTagMessages());
Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -47,14 +47,14 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testTagSmoke() {
String tag = "jueyin";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -68,7 +68,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExprress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExprress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -84,7 +84,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -100,7 +100,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -116,7 +116,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = null;
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -133,7 +133,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = "*";
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize);
List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize);
......@@ -156,7 +156,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = null;
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize);
List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize);
......@@ -178,7 +178,7 @@ public class TagMessageWith1ConsumerIT extends BaseConf {
String subExpress = tag2;
int msgSize = 10;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize);
List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize);
......
......@@ -25,7 +25,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.factory.TagMessage;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -49,7 +49,7 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -58,9 +58,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
String tag2 = "jueyin2";
int msgSize = 10;
RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tag1,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tag2,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize);
producer.send(tag1Msgs);
......@@ -89,9 +89,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tags[0],
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tags[1],
new RMQNormalListner());
new RMQNormalListener());
List<Object> tagMsgs = tagMessage.getMixedTagMessages();
producer.send(tagMsgs);
......@@ -120,9 +120,9 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, sub1,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, sub2,
new RMQNormalListner());
new RMQNormalListener());
List<Object> tagMsgs = tagMessage.getMixedTagMessages();
producer.send(tagMsgs);
......@@ -152,13 +152,13 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
int msgSize = 10;
RMQNormalConsumer consumerSubTwoMatchAll = getConsumer(nsAddr, topic, sub1,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerSubTwoMachieOne = getConsumer(nsAddr, topic, sub2,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerSubTag1 = getConsumer(nsAddr, topic, sub3,
new RMQNormalListner());
new RMQNormalListener());
RMQNormalConsumer consumerSubAll = getConsumer(nsAddr, topic, sub4,
new RMQNormalListner());
new RMQNormalListener());
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.RandomUtils;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
......@@ -47,7 +47,7 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -56,9 +56,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
String originMsgDCName = RandomUtils.getStringByUUID();
String msgBodyDCName = RandomUtils.getStringByUUID();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -75,11 +75,11 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
String msgBodyDCName = RandomUtils.getStringByUUID();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100);
TestUtils.waitForMoment(5);
getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
TestUtils.waitForMoment(5);
consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
......@@ -95,9 +95,9 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
String msgBodyDCName = RandomUtils.getStringByUUID();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
new RMQNormalListener(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100);
TestUtils.waitForMoment(5);
......
......@@ -21,7 +21,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.junit.After;
import org.junit.Assert;
......@@ -40,7 +40,7 @@ public class MulConsumerMulTopicIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -48,10 +48,10 @@ public class MulConsumerMulTopicIT extends BaseConf {
int msgSize = 10;
String topic1 = initTopic();
String topic2 = initTopic();
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer1.subscribe(topic2, "*");
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic1,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
consumer2.subscribe(topic2, "*");
producer.send(MQMessageFactory.getMsg(topic1, msgSize));
......@@ -69,10 +69,10 @@ public class MulConsumerMulTopicIT extends BaseConf {
String topic1 = initTopic();
String topic2 = initTopic();
String tag = "jueyin_tag";
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer1.subscribe(topic2, tag);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic1,
"*", new RMQNormalListner());
"*", new RMQNormalListener());
consumer2.subscribe(topic2, tag);
producer.send(MQMessageFactory.getMsg(topic1, msgSize));
......@@ -91,9 +91,9 @@ public class MulConsumerMulTopicIT extends BaseConf {
String topic2 = initTopic();
String tag1 = "jueyin_tag_1";
String tag2 = "jueyin_tag_2";
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer1.subscribe(topic2, tag1);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer2.subscribe(topic2, tag1);
producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2));
......
......@@ -21,7 +21,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -40,7 +40,7 @@ public class OneConsumerMulTopicIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......@@ -48,7 +48,7 @@ public class OneConsumerMulTopicIT extends BaseConf {
int msgSize = 10;
String topic1 = initTopic();
String topic2 = initTopic();
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer.subscribe(topic2, "*");
producer.send(MQMessageFactory.getMsg(topic1, msgSize));
......@@ -67,7 +67,7 @@ public class OneConsumerMulTopicIT extends BaseConf {
String topic1 = initTopic();
String topic2 = initTopic();
String tag = "jueyin_tag";
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer.subscribe(topic2, tag);
producer.send(MQMessageFactory.getMsg(topic1, msgSize));
......@@ -87,7 +87,7 @@ public class OneConsumerMulTopicIT extends BaseConf {
String topic2 = initTopic();
String tag1 = "jueyin_tag_1";
String tag2 = "jueyin_tag_2";
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListener());
consumer.subscribe(topic2, tag1);
producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2));
......
......@@ -50,7 +50,7 @@ public class AsyncSendExceptionIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -45,14 +45,14 @@ public class AsyncSendWithMessageQueueIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAsyncSendWithMQ() {
int msgSize = 20;
int queueId = 0;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
producer.asyncSend(msgSize, mq);
......
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -48,14 +48,14 @@ public class AsyncSendWithMessageQueueSelectorIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testSendWithSelector() {
int msgSize = 20;
final int queueId = 0;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.asyncSend(msgSize, new MessageQueueSelector() {
@Override
......
......@@ -22,7 +22,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -44,13 +44,13 @@ public class AsyncSendWithOnlySendCallBackIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testSendWithOnlyCallBack() {
int msgSize = 20;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.asyncSend(msgSize);
producer.waitForResponse(10 * 1000);
assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
......
......@@ -47,7 +47,7 @@ public class BatchSendIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -24,7 +24,7 @@ import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.factory.MessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -45,7 +45,7 @@ public class MessageUserPropIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
/**
......@@ -58,7 +58,7 @@ public class MessageUserPropIT extends BaseConf {
String msgValue = "jueyinValue";
msg.putUserProperty(msgKey, msgValue);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.send(msg, null);
assertThat(producer.getAllMsgBody().size()).isEqualTo(1);
......@@ -80,7 +80,7 @@ public class MessageUserPropIT extends BaseConf {
String msgValue = "jueyinzhi";
msg.putUserProperty(msgKey, msgValue);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.send(msg, null);
assertThat(producer.getAllMsgBody().size()).isEqualTo(1);
......
......@@ -39,7 +39,7 @@ public class ProducerGroupAndInstanceNameValidityIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
/**
......
......@@ -44,7 +44,7 @@ public class OneWaySendExceptionIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test(expected = java.lang.NullPointerException.class)
......
......@@ -22,7 +22,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -44,13 +44,13 @@ public class OneWaySendIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testOneWaySendWithOnlyMsgAsParam() {
int msgSize = 20;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.sendOneWay(msgSize);
producer.waitForResponse(5 * 1000);
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -46,14 +46,14 @@ public class OneWaySendWithMQIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testAsyncSendWithMQ() {
int msgSize = 20;
int queueId = 0;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
producer.sendOneWay(msgSize, mq);
......
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
......@@ -49,14 +49,14 @@ public class OneWaySendWithSelectorIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
public void testSendWithSelector() {
int msgSize = 20;
final int queueId = 0;
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.sendOneWay(msgSize, new MessageQueueSelector() {
@Override
......
......@@ -48,7 +48,7 @@ public class OrderMsgDynamicRebalanceIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -49,7 +49,7 @@ public class OrderMsgIT extends BaseConf {
@After
public void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -48,7 +48,7 @@ public class OrderMsgRebalanceIT extends BaseConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -47,7 +47,7 @@ public class OrderMsgWithTagIT extends BaseConf {
@After
public void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -42,7 +42,7 @@ public class QueryMsgByIdExceptionIT extends BaseConf {
@AfterClass
public static void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
......@@ -44,12 +44,12 @@ public class QueryMsgByIdIT extends BaseConf {
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic);
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
}
@After
public void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -45,7 +45,7 @@ public class QueryMsgByKeyIT extends BaseConf {
@After
public void tearDown() {
shutDown();
shutdown();
}
@Test
......
......@@ -47,7 +47,7 @@ public class NormalMsgDelayIT extends DelayConf {
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
......@@ -21,7 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
......@@ -41,12 +41,12 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic);
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
}
@After
public void tearDown() {
super.shutDown();
super.shutdown();
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.tls;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TLS_IT extends BaseConf {
private RMQNormalProducer producer;
private RMQNormalConsumer consumer;
private String topic;
@Before
public void setUp() {
topic = initTopic();
// Send messages via TLS
producer = getProducer(nsAddr, topic, true);
// Receive messages via TLS
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), true);
}
@After
public void tearDown() {
shutdown();
}
@Test
public void testSendAndReceiveMessageOverTLS() {
int numberOfMessagesToSend = 16;
producer.send(numberOfMessagesToSend);
boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener());
Assertions.assertThat(consumedAll).isEqualTo(true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.tls;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TLS_Mix2_IT extends BaseConf {
private RMQNormalProducer producer;
private RMQNormalConsumer consumer;
private String topic;
@Before
public void setUp() {
topic = initTopic();
// send message via TLS
producer = getProducer(nsAddr, topic, true);
// Receive message without TLS.
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), false);
}
@After
public void tearDown() {
shutdown();
}
@Test
public void testSendAndReceiveMessageOverTLS() {
int numberOfMessagesToSend = 16;
producer.send(numberOfMessagesToSend);
boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener());
Assertions.assertThat(consumedAll).isEqualTo(true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.tls;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TLS_Mix_IT extends BaseConf {
private RMQNormalProducer producer;
private RMQNormalConsumer consumer;
private String topic;
@Before
public void setUp() {
topic = initTopic();
// send message without TLS
producer = getProducer(nsAddr, topic);
// Receive message via TLS
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener(), true);
}
@After
public void tearDown() {
shutdown();
}
@Test
public void testSendAndReceiveMessageOverTLS() {
int numberOfMessagesToSend = 16;
producer.send(numberOfMessagesToSend);
boolean consumedAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener());
Assertions.assertThat(consumedAll).isEqualTo(true);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册