提交 0de84e20 编写于 作者: S stevenschew

[ROCKETMQ-57] Add license

notifications:
email:
recipients:
- dev@rocketmq.incubator.apache.org
on_success: change
on_failure: always
language: java
jdk:
- oraclejdk8
- oraclejdk7
- openjdk7
matrix:
include:
# On OSX, run with default JDK only.
# - os: osx
# On Linux, run with specific JDKs only.
- os: linux
env: CUSTOM_JDK="oraclejdk8"
- os: linux
env: CUSTOM_JDK="oraclejdk7"
- os: linux
env: CUSTOM_JDK="openjdk7"
before_install:
- echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
- cat ~/.mavenrc
- if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
- if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
#os:
# - linux
# - osx
#jdk:
# - oraclejdk8
# - oraclejdk7
# - openjdk7
script:
- travis_retry mvn -B clean apache-rat:check
- travis_retry mvn -B package jacoco:report coveralls:report
after_success:
- mvn sonar:sonar
## RocketMQ [![Build Status](https://travis-ci.org/apache/incubator-rocketmq.svg?branch=master)](https://travis-ci.org/apache/incubator-rocketmq)
## RocketMQ [![Build Status](https://travis-ci.org/apache/incubator-rocketmq.svg?branch=master)](https://travis-ci.org/apache/incubator-rocketmq) [![Coverage Status](https://coveralls.io/repos/github/apache/incubator-rocketmq/badge.svg?branch=master)](https://coveralls.io/github/apache/incubator-rocketmq?branch=master)
[![Maven Central](https://img.shields.io/badge/maven--center-stable--version-green.svg)](http://search.maven.org/#search%7Cga%7C1%7Corg.apache.rocketmq)
[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.org/apache/rocketmqreleases)
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.broker;
import java.io.File;
import java.util.Random;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerTestHarness {
public final String BROKER_NAME = "TestBrokerName";
protected BrokerController brokerController = null;
protected Random random = new Random();
protected String brokerAddr = "";
protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class);
protected BrokerConfig brokerConfig = new BrokerConfig();
protected NettyServerConfig nettyServerConfig = new NettyServerConfig();
protected NettyClientConfig nettyClientConfig = new NettyClientConfig();
protected MessageStoreConfig storeConfig = new MessageStoreConfig();
@Before
public void startup() throws Exception {
brokerConfig.setBrokerName(BROKER_NAME);
brokerConfig.setBrokerIP1("127.0.0.1");
storeConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore");
storeConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog");
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
brokerAddr = brokerConfig.getBrokerIP1() + ":" + nettyServerConfig.getListenPort();
brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
boolean initResult = brokerController.initialize();
Assert.assertTrue(initResult);
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
}
@After
public void shutdown() throws Exception {
if (brokerController != null) {
brokerController.shutdown();
}
//maybe need to clean the file store. But we do not suggest deleting anything.
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.api;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.junit.Assert;
import org.junit.Test;
public class BrokerFastFailureTest extends BrokerTestHarness {
@Test
public void testHeadSlowTimeMills() throws InterruptedException {
BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();
blockingQueue.add(new FutureTaskExt<>(new RequestTask(null, null, null), null));
TimeUnit.MILLISECONDS.sleep(10);
Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) > 0);
blockingQueue.clear();
blockingQueue.add(new Runnable() {
@Override public void run() {
}
});
Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) == 0);
}
@Test
public void testCastRunnable() {
Runnable runnable = new Runnable() {
@Override public void run() {
}
};
Assert.assertNull(BrokerFastFailure.castRunnable(runnable));
RequestTask requestTask = new RequestTask(null, null, null);
runnable = new FutureTaskExt<>(requestTask, null);
Assert.assertEquals(requestTask, BrokerFastFailure.castRunnable(runnable));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.broker.api;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class SendMessageTest extends BrokerTestHarness {
MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
String topic = "UnitTestTopic";
@Before
@Override
public void startup() throws Exception {
super.startup();
client.start();
}
@After
@Override
public void shutdown() throws Exception {
client.shutdown();
super.shutdown();
}
@Test
public void testSendSingle() throws Exception {
Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup("abc");
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
requestHeader.setDefaultTopicQueueNums(4);
requestHeader.setQueueId(0);
requestHeader.setSysFlag(0);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
CommunicationMode.SYNC, new SendMessageContext(), null);
assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.broker.offset;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ConsumerOffsetManagerTest extends BrokerTestHarness {
@Test
public void testFlushConsumerOffset() throws Exception {
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
for (int i = 0; i < 10; i++) {
String group = "UNIT_TEST_GROUP_" + i;
for (int id = 0; id < 10; id++) {
consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, id + 100);
consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, id + 100);
consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, id + 100);
}
}
consumerOffsetManager.persist();
consumerOffsetManager.getOffsetTable().clear();
for (int i = 0; i < 10; i++) {
String group = "UNIT_TEST_GROUP_" + i;
for (int id = 0; id < 10; id++) {
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), -1);
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
}
}
consumerOffsetManager.load();
for (int i = 0; i < 10; i++) {
String group = "UNIT_TEST_GROUP_" + i;
for (int id = 0; id < 10; id++) {
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), id + 100);
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.broker.topic;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class TopicConfigManagerTest extends BrokerTestHarness {
@Test
public void testFlushTopicConfig() throws Exception {
TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
assertNotNull(topicConfig);
}
topicConfigManager.persist();
topicConfigManager.getTopicConfigTable().clear();
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
assertNull(topicConfig);
}
topicConfigManager.load();
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
assertNotNull(topicConfig);
assertEquals(topicConfig.getTopicSysFlag(), 0);
assertEquals(topicConfig.getReadQueueNums(), 4);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private DefaultMQPullConsumer pullConsumer;
private String consumerGroup = "FooBarGroup";
private String topic = "FooBar";
private String brokerName = "BrokerA";
@Before
public void init() throws Exception {
pullConsumer = new DefaultMQPullConsumer(consumerGroup);
pullConsumer.setNamesrvAddr("127.0.0.1:9876");
pullConsumer.start();
PullAPIWrapper pullAPIWrapper = pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
Field field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pullAPIWrapper, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
}
@After
public void terminate() {
pullConsumer.shutdown();
}
@Test
public void testPullMessage_Success() throws Exception {
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
return createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
}
}).when(mQClientAPIImpl).pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class));
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
assertThat(pullResult).isNotNull();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
assertThat(pullResult.getMinOffset()).isEqualTo(123);
assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<>());
}
@Test
public void testPullMessage_NotFound() throws Exception{
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
return createPullResult(requestHeader, PullStatus.NO_NEW_MSG, new ArrayList<MessageExt>());
}
}).when(mQClientAPIImpl).pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class));
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
}
@Test
public void testPullMessageAsync_Success() throws Exception {
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
PullCallback pullCallback = mock.getArgument(4);
pullCallback.onSuccess(pullResult);
return null;
}
}).when(mQClientAPIImpl).pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class));
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() {
@Override public void onSuccess(PullResult pullResult) {
assertThat(pullResult).isNotNull();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
assertThat(pullResult.getMinOffset()).isEqualTo(123);
assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
assertThat(pullResult.getMsgFoundList()).isEqualTo(new ArrayList<>());
}
@Override public void onException(Throwable e) {
}
});
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws Exception {
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {});
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private DefaultMQPushConsumer pushConsumer;
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
});
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
mQClientFactory.start();
}
@After
public void terminate() {
pushConsumer.shutdown();
}
@Test
public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
return null;
}
}));
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
return null;
}
}).when(mQClientAPIImpl).pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class));
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await();
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(1024);
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
pullRequest.setMessageQueue(messageQueue);
pullRequest.setProcessQueue(new ProcessQueue());
return pullRequest;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
}
\ No newline at end of file
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.consumer.store;
import java.io.File;
import java.util.Collections;
import java.util.HashSet;
import org.apache.rocketmq.client.ClientConfig;
......@@ -40,7 +41,7 @@ public class LocalFileOffsetStoreTest {
@Before
public void init() {
System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + File.separator + ".rocketmq_offsets");
String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
when(mQClientFactory.getClientId()).thenReturn(clientId);
}
......
......@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private String topic = "FooBar";
private String group = "FooBarGroup";
......@@ -53,7 +53,7 @@ public class MQClientInstanceTest {
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(0L, "127.0.0.1");
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
......@@ -61,7 +61,7 @@ public class MQClientInstanceTest {
List<QueueData> queueDataList = new ArrayList<>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(2);
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private DefaultMQProducer producer;
private Message message;
private Message zeroMsg;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
@Before
public void init() throws Exception {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
message = new Message(topic, new byte[] {'a'});
zeroMsg = new Message(topic, new byte[] {});
producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
@After
public void terminate() {
producer.shutdown();
}
@Test
public void testSendMessage_ZeroMessage() throws InterruptedException, RemotingException, MQBrokerException {
try {
producer.send(zeroMsg);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("message body length is zero");
}
}
@Test
public void testSendMessage_NoNameSrv() throws RemotingException, InterruptedException, MQBrokerException {
when(mQClientAPIImpl.getNameServerAddressList()).thenReturn(new ArrayList<String>());
try {
producer.send(message);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("No name server address");
}
}
@Test
public void testSendMessage_NoRoute() throws RemotingException, InterruptedException, MQBrokerException {
when(mQClientAPIImpl.getNameServerAddressList()).thenReturn(Collections.singletonList("127.0.0.1:9876"));
try {
producer.send(message);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("No route info of this topic");
}
}
@Test
public void testSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendResult sendResult = producer.send(message);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Test
public void testSendMessageSync_SuccessWithHook() throws Throwable {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final Throwable[] assertionErrors = new Throwable[1];
final CountDownLatch countDownLatch = new CountDownLatch(2);
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
@Override public String hookName() {
return "TestHook";
}
@Override public void sendMessageBefore(final SendMessageContext context) {
assertionErrors[0] = assertInOtherThread(new Runnable() {
@Override public void run() {
assertThat(context.getMessage()).isEqualTo(message);
assertThat(context.getProducer()).isEqualTo(producer);
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
assertThat(context.getSendResult()).isNull();
}
});
countDownLatch.countDown();
}
@Override public void sendMessageAfter(final SendMessageContext context) {
assertionErrors[0] = assertInOtherThread(new Runnable() {
@Override public void run() {
assertThat(context.getMessage()).isEqualTo(message);
assertThat(context.getProducer()).isEqualTo(producer.getDefaultMQProducerImpl());
assertThat(context.getCommunicationMode()).isEqualTo(CommunicationMode.SYNC);
assertThat(context.getSendResult()).isNotNull();
}
});
countDownLatch.countDown();
}
});
SendResult sendResult = producer.send(message);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
countDownLatch.await();
if (assertionErrors[0] != null) {
throw assertionErrors[0];
}
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
private Throwable assertInOtherThread(final Runnable runnable) {
final Throwable[] assertionErrors = new Throwable[1];
Thread thread = new Thread(new Runnable() {
@Override public void run() {
try {
runnable.run();
} catch (AssertionError e) {
assertionErrors[0] = e;
}
}
});
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
assertionErrors[0] = e;
}
return assertionErrors[0];
}
}
\ No newline at end of file
......@@ -32,7 +32,7 @@
<artifactId>rocketmq-all</artifactId>
<version>4.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>rocketmq-all ${project.version}</name>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.incubator.apache.org/</url>
<prerequisites>
......@@ -42,7 +42,8 @@
<scm>
<url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git</url>
<connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git</developerConnection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git
</developerConnection>
</scm>
<mailingLists>
......@@ -151,12 +152,22 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--maven properties -->
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Maven properties -->
<maven.test.skip>false</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip>
<!-- compiler settings properties -->
<!-- Compiler settings properties -->
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<!-- Overwritten by the test configuration,otherwise the JaCoCo agent cannot be attached.Details see http://www.eclemma.org/jacoco/trunk/doc/prepare-agent-mojo.html -->
<argLine>-Xms512m -Xmx1024m</argLine>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<!-- URL of the ASF SonarQube server -->
<sonar.host.url>https://builds.apache.org/analysis</sonar.host.url>
<!-- Exclude all generated code -->
<sonar.exclusions>file:**/generated-sources/**</sonar.exclusions>
</properties>
<modules>
......@@ -229,12 +240,21 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<argLine>-Xms512m -Xmx1024m</argLine>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
......@@ -317,6 +337,52 @@
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.eluder.coveralls</groupId>
<artifactId>coveralls-maven-plugin</artifactId>
<version>4.3.0</version>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.7.8</version>
<executions>
<execution>
<id>default-prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>default-prepare-agent-integration</id>
<goals>
<goal>prepare-agent-integration</goal>
</goals>
</execution>
<execution>
<id>default-report</id>
<goals>
<goal>report</goal>
</goals>
</execution>
<execution>
<id>default-report-integration</id>
<goals>
<goal>report-integration</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.0.2</version>
</plugin>
</plugins>
</build>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.admin;
import org.apache.rocketmq.client.ClientConfig;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command;
import org.apache.rocketmq.client.ClientConfig;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册