diff --git a/pom.xml b/pom.xml index decb3525f1b62b04c9972cc6bf4be2646934f546..fa4a0be0f3ac9bd625329518ae77fe7d2dca5ccd 100644 --- a/pom.xml +++ b/pom.xml @@ -242,7 +242,6 @@ com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java com/alibaba/rocketmq/store/RecoverTest.java - com/alibaba/rocketmq/broker/api/SendMessageTest.java diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java new file mode 100644 index 0000000000000000000000000000000000000000..ca6f17b97afc0f1b5b742afd075899ca3e7a8b21 --- /dev/null +++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.broker; + +import com.alibaba.rocketmq.common.BrokerConfig; +import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Random; + +/** + * @author zander + */ +public class BrokerTestHarness { + + protected BrokerController brokerController = null; + + protected Random random = new Random(); + public final String BROKER_NAME = "TestBrokerName"; + protected String brokerAddr = ""; + protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class); + protected BrokerConfig brokerConfig = new BrokerConfig(); + protected NettyServerConfig nettyServerConfig = new NettyServerConfig(); + protected NettyClientConfig nettyClientConfig = new NettyClientConfig(); + protected MessageStoreConfig storeConfig = new MessageStoreConfig(); + + @Before + public void startup() throws Exception { + brokerConfig.setBrokerName(BROKER_NAME); + brokerConfig.setBrokerIP1("127.0.0.1"); + storeConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore"); + storeConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog"); + nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); + brokerAddr = brokerConfig.getBrokerIP1() + ":" + nettyServerConfig.getListenPort(); + brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); + boolean initResult = brokerController.initialize(); + Assert.assertTrue(initResult); + logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); + brokerController.start(); + } + + @After + public void shutdown() throws Exception { + if (brokerController != null) { + brokerController.shutdown(); + } + //maybe need to clean the file store. But we do not suggest deleting anything. + } +} diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java index 34ebfa5c5d165180a56470570bee4854fda63678..d9babc29d348ab350920bb73d2f5f0d808027691 100644 --- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java +++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java @@ -20,68 +20,68 @@ */ package com.alibaba.rocketmq.broker.api; -import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.BrokerTestHarness; +import com.alibaba.rocketmq.client.ClientConfig; import com.alibaba.rocketmq.client.hook.SendMessageContext; import com.alibaba.rocketmq.client.impl.CommunicationMode; import com.alibaba.rocketmq.client.impl.MQClientAPIImpl; import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.BrokerConfig; +import com.alibaba.rocketmq.client.producer.SendStatus; import com.alibaba.rocketmq.common.MixAll; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageDecoder; import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + /** - * @author shijia.wxr + * @author zander */ -public class SendMessageTest { - @Test - public void test_sendMessage() throws Exception { - BrokerController brokerController = new BrokerController(// - new BrokerConfig(), // - new NettyServerConfig(), // - new NettyClientConfig(), // - new MessageStoreConfig()); - boolean initResult = brokerController.initialize(); - System.out.println("initialize " + initResult); +public class SendMessageTest extends BrokerTestHarness{ - brokerController.start(); + MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig()); + String topic = "UnitTestTopic"; - MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, null); + @Before + @Override + public void startup() throws Exception { + super.startup(); client.start(); - for (int i = 0; i < 100; i++) { - String topic = "UnitTestTopic_" + i % 3; - Message msg = new Message(topic, "TAG1 TAG2", "100200300", ("Hello, Nice world\t" + i).getBytes()); - msg.setDelayTimeLevel(i % 3 + 1); - - try { - SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); - requestHeader.setProducerGroup("abc"); - requestHeader.setTopic(msg.getTopic()); - requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC); - requestHeader.setDefaultTopicQueueNums(4); - requestHeader.setQueueId(i % 4); - requestHeader.setSysFlag(0); - requestHeader.setBornTimestamp(System.currentTimeMillis()); - requestHeader.setFlag(msg.getFlag()); - requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); - - SendResult result = client.sendMessage("127.0.0.1:10911", "brokerName", msg, requestHeader, 1000 * 5, - CommunicationMode.SYNC, new SendMessageContext(), null); - System.out.println(i + "\t" + result); - } catch (Exception e) { - e.printStackTrace(); - } - } + } + @After + @Override + public void shutdown() throws Exception { client.shutdown(); + super.shutdown(); + } - brokerController.shutdown(); + @Test + public void testSendSingle() throws Exception { + Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes()); + try { + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setProducerGroup("abc"); + requestHeader.setTopic(msg.getTopic()); + requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC); + requestHeader.setDefaultTopicQueueNums(4); + requestHeader.setQueueId(0); + requestHeader.setSysFlag(0); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setFlag(msg.getFlag()); + requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); + + SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5, + CommunicationMode.SYNC, new SendMessageContext(), null); + assertTrue(result.getSendStatus() == SendStatus.SEND_OK); + } catch (Exception e) { + e.printStackTrace(); + } } } diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java index 55844eb8e56218e869adbf2ce21f704be74741c6..94504a487190474445cea30a5f1c66873ac34d14 100644 --- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java +++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -20,49 +20,46 @@ */ package com.alibaba.rocketmq.broker.offset; -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.common.BrokerConfig; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import com.alibaba.rocketmq.broker.BrokerTestHarness; import org.junit.Test; -import java.util.Random; +import static org.junit.Assert.assertEquals; /** - * @author shijia.wxr + * @author zander */ -public class ConsumerOffsetManagerTest { - @Test - public void test_flushConsumerOffset() throws Exception { - BrokerController brokerController = new BrokerController(// - new BrokerConfig(), // - new NettyServerConfig(), // - new NettyClientConfig(), // - new MessageStoreConfig()); - boolean initResult = brokerController.initialize(); - System.out.println("initialize " + initResult); - brokerController.start(); +public class ConsumerOffsetManagerTest extends BrokerTestHarness{ + @Test + public void testFlushConsumerOffset() throws Exception { ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController); - - Random random = new Random(); - - for (int i = 0; i < 100; i++) { - String group = "DIANPU_GROUP_" + i; - for (int id = 0; id < 16; id++) { - consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, - random.nextLong() % 1024 * 1024 * 1024); - consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, - random.nextLong() % 1024 * 1024 * 1024); - consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, - random.nextLong() % 1024 * 1024 * 1024); + for (int i = 0; i < 10; i++) { + String group = "UNIT_TEST_GROUP_" + i; + for (int id = 0; id < 10; id++) { + consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, id + 100); + consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, id + 100); + consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, id + 100); } } - consumerOffsetManager.persist(); - - brokerController.shutdown(); + consumerOffsetManager.getOffsetTable().clear(); + for (int i = 0; i < 10; i++) { + String group = "UNIT_TEST_GROUP_" + i; + for (int id = 0; id < 10; id++) { + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), -1); + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1); + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1); + } + } + consumerOffsetManager.load(); + for (int i = 0; i < 10; i++) { + String group = "UNIT_TEST_GROUP_" + i; + for (int id = 0; id < 10; id++) { + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), id + 100); + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100); + assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100); + } + } } } diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java index 9edd02e0083ecea05fbe7c01fae105f04555045d..7a6503fe05301157f43388994bc7bbe1c54336c5 100644 --- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java +++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java @@ -20,52 +20,43 @@ */ package com.alibaba.rocketmq.broker.topic; -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.common.BrokerConfig; +import com.alibaba.rocketmq.broker.BrokerTestHarness; import com.alibaba.rocketmq.common.MixAll; import com.alibaba.rocketmq.common.TopicConfig; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; import org.junit.Test; import static org.junit.Assert.assertTrue; /** - * @author shijia.wxr + * @author zander */ -public class TopicConfigManagerTest { +public class TopicConfigManagerTest extends BrokerTestHarness { @Test - public void test_flushTopicConfig() throws Exception { - BrokerController brokerController = new BrokerController(// - new BrokerConfig(), // - new NettyServerConfig(), // - new NettyClientConfig(), // - new MessageStoreConfig()); - boolean initResult = brokerController.initialize(); - System.out.println("initialize " + initResult); - brokerController.start(); - + public void testFlushTopicConfig() throws Exception { TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController); - TopicConfig topicConfig = - topicConfigManager.createTopicInSendMessageMethod("TestTopic_SEND", MixAll.DEFAULT_TOPIC, - null, 4, 0); - assertTrue(topicConfig != null); - - System.out.println(topicConfig); - for (int i = 0; i < 10; i++) { String topic = "UNITTEST-" + i; - topicConfig = - topicConfigManager - .createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0); + TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0); assertTrue(topicConfig != null); } - topicConfigManager.persist(); - brokerController.shutdown(); + topicConfigManager.getTopicConfigTable().clear(); + + for (int i = 0; i < 10; i++) { + String topic = "UNITTEST-" + i; + TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic); + assertTrue(topicConfig == null); + } + topicConfigManager.load(); + for (int i = 0; i < 10; i++) { + String topic = "UNITTEST-" + i; + TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic); + assertTrue(topicConfig != null); + assertTrue(topicConfig.getTopicSysFlag() == 0); + assertTrue(topicConfig.getReadQueueNums() == 4); + } } } diff --git a/rocketmq-broker/src/test/resources/logback-test.xml b/rocketmq-broker/src/test/resources/logback-test.xml new file mode 100644 index 0000000000000000000000000000000000000000..3481c93176681758a7a9f3ef558c47952b9f2a36 --- /dev/null +++ b/rocketmq-broker/src/test/resources/logback-test.xml @@ -0,0 +1,33 @@ + + + + + + + true + + %d{yyy-MM-dd HH\:mm\:ss,GMT+8} %p %t - %m%n + UTF-8 + + + + + + + +