提交 c549854f 编写于 作者: Y yukon

[ROCKETMQ-51] Remove heavy unit tests in rocketmq-broker

上级 6511bc32
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.broker;
import java.io.File;
import java.util.Random;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerTestHarness {
public final String BROKER_NAME = "TestBrokerName";
protected BrokerController brokerController = null;
protected Random random = new Random();
protected String brokerAddr = "";
protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class);
protected BrokerConfig brokerConfig = new BrokerConfig();
protected NettyServerConfig nettyServerConfig = new NettyServerConfig();
protected NettyClientConfig nettyClientConfig = new NettyClientConfig();
protected MessageStoreConfig storeConfig = new MessageStoreConfig();
@Before
public void startup() throws Exception {
brokerConfig.setBrokerName(BROKER_NAME);
brokerConfig.setBrokerIP1("127.0.0.1");
storeConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore");
storeConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog");
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
brokerAddr = brokerConfig.getBrokerIP1() + ":" + nettyServerConfig.getListenPort();
brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
boolean initResult = brokerController.initialize();
Assert.assertTrue(initResult);
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
}
@After
public void shutdown() throws Exception {
if (brokerController != null) {
brokerController.shutdown();
}
//maybe need to clean the file store. But we do not suggest deleting anything.
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.api;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.junit.Assert;
import org.junit.Test;
public class BrokerFastFailureTest extends BrokerTestHarness {
@Test
public void testHeadSlowTimeMills() throws InterruptedException {
BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();
blockingQueue.add(new FutureTaskExt<>(new RequestTask(null, null, null), null));
TimeUnit.MILLISECONDS.sleep(10);
Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) > 0);
blockingQueue.clear();
blockingQueue.add(new Runnable() {
@Override public void run() {
}
});
Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) == 0);
}
@Test
public void testCastRunnable() {
Runnable runnable = new Runnable() {
@Override public void run() {
}
};
Assert.assertNull(BrokerFastFailure.castRunnable(runnable));
RequestTask requestTask = new RequestTask(null, null, null);
runnable = new FutureTaskExt<>(requestTask, null);
Assert.assertEquals(requestTask, BrokerFastFailure.castRunnable(runnable));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.broker.api;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class SendMessageTest extends BrokerTestHarness {
MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
String topic = "UnitTestTopic";
@Before
@Override
public void startup() throws Exception {
super.startup();
client.start();
}
@After
@Override
public void shutdown() throws Exception {
client.shutdown();
super.shutdown();
}
@Test
public void testSendSingle() throws Exception {
Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup("abc");
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
requestHeader.setDefaultTopicQueueNums(4);
requestHeader.setQueueId(0);
requestHeader.setSysFlag(0);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
CommunicationMode.SYNC, new SendMessageContext(), null);
assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.broker.offset;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ConsumerOffsetManagerTest extends BrokerTestHarness {
@Test
public void testFlushConsumerOffset() throws Exception {
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
for (int i = 0; i < 10; i++) {
String group = "UNIT_TEST_GROUP_" + i;
for (int id = 0; id < 10; id++) {
consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, id + 100);
consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, id + 100);
consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, id + 100);
}
}
consumerOffsetManager.persist();
consumerOffsetManager.getOffsetTable().clear();
for (int i = 0; i < 10; i++) {
String group = "UNIT_TEST_GROUP_" + i;
for (int id = 0; id < 10; id++) {
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), -1);
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
}
}
consumerOffsetManager.load();
for (int i = 0; i < 10; i++) {
String group = "UNIT_TEST_GROUP_" + i;
for (int id = 0; id < 10; id++) {
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), id + 100);
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.broker.topic;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class TopicConfigManagerTest extends BrokerTestHarness {
@Test
public void testFlushTopicConfig() throws Exception {
TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
assertNotNull(topicConfig);
}
topicConfigManager.persist();
topicConfigManager.getTopicConfigTable().clear();
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
assertNull(topicConfig);
}
topicConfigManager.load();
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
assertNotNull(topicConfig);
assertEquals(topicConfig.getTopicSysFlag(), 0);
assertEquals(topicConfig.getReadQueueNums(), 4);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册